Scheduling tasks using Message Queue

Introduction

How to schedule your task for later execution? You often create table in database, configure job that checks if due time of any task occured and then execute it.

But there is easier way if only you have message broker with your application… You could publish/send your message and tell it that it should be delivered with specified delay.

Scheduling messages using ActiveMQ

ActiveMQ is open source message broker written in Java. It is implementation of JMS (Java Message Service).

You could start its broker with scheduling support by adding flag schedulerSupport to broker configuration:

<beans ...>
    ...
    <broker xmlns="http://activemq.apache.org/schema/core"
            brokerName="localhost"
            dataDirectory="${activemq.data}"
            schedulerSupport="true">
            ...
    </broker>
    ...
</beans>

 

Now, if you want to delay receiving message by few seconds, you could add property during message creation, e.g.:

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 8000)

Delay unit is miliseconds.

Of course queue must be persisted.

When you listen for message on the same queue, then you will see that message indeed will be received with 8 second delay.

...
Send time: Tue Dec 01 18:51:23 CET 2015
...
Message received at Tue Dec 01 18:51:31 CET 2015
...

Scheduling messages using RabbitMQ

Scheduling tasks is not only the feature of ActiveMQ. It is also available with RabbitMQ.

RabitMQ is message broker written in Erlang. It uses protocol AMQP.

First you have to install plugin rabbitmq_delayed_message_exchange. It could be done via command:

rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

You have to define exchange in RabbitMQ which will use features from this plugin. Queue for delayed messages should be bound to this exchange. Routing key should be set to queue name.

channel.exchangeDeclare(exchange, 'x-delayed-message', true, false, ['x-delayed-type': 'direct']);
channel.queueBind(queue, exchange, queue);
channel.queueDeclare(queue, true, false, false, null);

Of course queue must be persisted.

To test it just publish new message with property x-delay:

channel.basicPublish(
    exchange, 
    queue, 
    new AMQP.BasicProperties.Builder().headers('x-delay': 8000).build(),
    "Message: $currentUuid".bytes
)

Message will be delayed with 8 seconds:

...
Send time: Tue Dec 01 19:04:18 CET 2015
...
Message received at Tue Dec 01 19:04:26 CET 2015
...

Conclusion

Why you create similar mechanism for handling scheduled tasks on your own, when you could use your message brokers and delayed messages to schedule future tasks?

Sources are available here.

You May Also Like

Log4j and MDC in Grails

Log4j provides very useful feature: MDC - mapped diagnostic context. It can be used to store data in context of current thread. It may sound scary a bit but idea is simple.

My post is based on post http://burtbeckwith.com/blog/?p=521 from Burt Beckwith's excellent blog, it's definitely worth checking if you are interested in Grails.

Short background story...


Suppose we want to do logging our brand new shopping system and we want to have in each log customer's shopping basket number. And our system can be used at once by many users who can perform many transactions, actions like adding items and so on. How can we achieve that? Of course we can add basket number in every place where we do some logging but this task would be boring and error-prone. 

Instead of this we can use MDC to store variable with basket number in map. 

In fact MDC can be treated as map of custom values for current thread that can be used by logger. 


How to do that with Grails?


Using MDC with Grails is quite simple. All we need to do is to create our own custom filter which works for given urls and puts our data in MDC.

Filters in Grails are classes in directory grails-app/conf/* which names end with *Filters.groovy postfix. We can create this class manually or use Grails command: 
grails create-filters info.rnowak.App.Basket

In result class named BasketFilters will be created in grails-app/conf/info/rnowak/UberApp.

Initially filter class looks a little bit empty:
class BasketFilters {
def filters = {
all(controller:'*', action:'*') {
before = {

}
after = { Map model ->

}
afterView = { Exception e ->

}
}
}
}
All we need to do is fill empty closures, modify filter properties and put some data into MDC.

all is the general name of our filter, as class BasketFilters (plural!) can contain many various filters. You can name it whatever you want, for this post let assume it will be named basketFilter

Another thing is change of filter parameters. According to official documentation (link) we can customize our filter in many ways. You can specify controller to be filtered, its actions, filtered urls and so on. In our example you can stay with default option where filter is applied to every action of every controller. If you are interested in filtering only some urls, use uri parameter with expression describing desired urls to be filtered.

Three closures that are already defined in template have their function and they are started in these conditions:

  • before - as name says, it is executed before filtered action takes place
  • after - similarly, it is called after the action
  • afterView - called after rendering of the actions view
Ok, so now we know what are these mysterious methods and when they are called. But what can be done within them? In official Grails docs (link again) under section 7.6.3 there is a list of properties that are available to use in filter.

With that knowledge, we can proceed to implementing filter.

Putting something into MDC in filter


What we want to do is quite easy: we want to retrieve basket number from parameters and put it into MDC in our filter:
class BasketFilters {
def filters = {
basketFilter(controller:'*', action:'*') {
before = {
MDC.put("basketNumber", params.basketNumber ?: "")
}
after = { Map model ->
MDC.remove("basketNumber")
}
}
}
}

We retrieve basket number from Grails params map and then we put in map under specified key ("basketNumber" in this case), which will be later used in logger conversion pattern. It is important to remove custom value after processing of action to avoid leaks.

So we are putting something into MDC. But how make use of it in logs?


We can refer to custom data in MDC in conversion patter using syntax: %X{key}, where key is our key we used in filter to put data, like:
def conversionPattern = "%d{yyyy-MM-dd HH:mm:ss} %-5p %t [%c{1}] %X{basketNumber} - %m%n"


And that's it :) We've put custom data in log4j MDC and successfully used it in logs to display interesting values.

Mock Retrofit using Dagger and Mockito

Retrofit is one of the most popular REST client for Android, if you never use it, it is high time to start. There are a lot of articles and tutorial talking about Retrofit. I just would like to show how to mock a REST server during develop of app and i...Retrofit is one of the most popular REST client for Android, if you never use it, it is high time to start. There are a lot of articles and tutorial talking about Retrofit. I just would like to show how to mock a REST server during develop of app and i...