Scheduling tasks using Message Queue

Introduction

How to schedule your task for later execution? You often create table in database, configure job that checks if due time of any task occured and then execute it.

But there is easier way if only you have message broker with your application… You could publish/send your message and tell it that it should be delivered with specified delay.

Scheduling messages using ActiveMQ

ActiveMQ is open source message broker written in Java. It is implementation of JMS (Java Message Service).

You could start its broker with scheduling support by adding flag schedulerSupport to broker configuration:

<beans ...>
    ...
    <broker xmlns="http://activemq.apache.org/schema/core"
            brokerName="localhost"
            dataDirectory="${activemq.data}"
            schedulerSupport="true">
            ...
    </broker>
    ...
</beans>

 

Now, if you want to delay receiving message by few seconds, you could add property during message creation, e.g.:

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 8000)

Delay unit is miliseconds.

Of course queue must be persisted.

When you listen for message on the same queue, then you will see that message indeed will be received with 8 second delay.

...
Send time: Tue Dec 01 18:51:23 CET 2015
...
Message received at Tue Dec 01 18:51:31 CET 2015
...

Scheduling messages using RabbitMQ

Scheduling tasks is not only the feature of ActiveMQ. It is also available with RabbitMQ.

RabitMQ is message broker written in Erlang. It uses protocol AMQP.

First you have to install plugin rabbitmq_delayed_message_exchange. It could be done via command:

rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

You have to define exchange in RabbitMQ which will use features from this plugin. Queue for delayed messages should be bound to this exchange. Routing key should be set to queue name.

channel.exchangeDeclare(exchange, 'x-delayed-message', true, false, ['x-delayed-type': 'direct']);
channel.queueBind(queue, exchange, queue);
channel.queueDeclare(queue, true, false, false, null);

Of course queue must be persisted.

To test it just publish new message with property x-delay:

channel.basicPublish(
    exchange, 
    queue, 
    new AMQP.BasicProperties.Builder().headers('x-delay': 8000).build(),
    "Message: $currentUuid".bytes
)

Message will be delayed with 8 seconds:

...
Send time: Tue Dec 01 19:04:18 CET 2015
...
Message received at Tue Dec 01 19:04:26 CET 2015
...

Conclusion

Why you create similar mechanism for handling scheduled tasks on your own, when you could use your message brokers and delayed messages to schedule future tasks?

Sources are available here.

You May Also Like

Grails with Spock unit test + IntelliJ IDEA = No thread-bound request found

During my work with Grails project using Spock test in IntelliJ IDEA I've encountered this error:

java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131)
at org.codehaus.groovy.grails.plugins.web.api.CommonWebApi.currentRequestAttributes(CommonWebApi.java:205)
at org.codehaus.groovy.grails.plugins.web.api.CommonWebApi.getParams(CommonWebApi.java:65)
... // and few more lines of stacktrace ;)

It occurred when I tried to debug one of test from IDEA level. What is interesting, this error does not happen when I'm running all test using grails test-app for instance.

So what was the issue? With little of reading and tip from Tomek Kalkosiński (http://refaktor.blogspot.com/) it turned out that our test was missing @TestFor annotation and adding it solved all problems.

This annotation, according to Grails docs (link), indicates Spock what class is being tested and implicitly creates field with given type in test class. It is somehow strange as problematic test had explicitly and "manually" created field with proper controller type. Maybe there is a problem with mocking servlet requests?

How we use Kotlin with Exposed at TouK

Why Kotlin? At TouK, we try to early adopt technologies. We don’t have a starter project skeleton that is reused in every new project, we want to try something that fits the project needs, even if it’s not that popular yet. We tried Kotlin first it mid 2016, right after reaching 1.0.2 version