Scheduling tasks using Message Queue

Introduction

How to schedule your task for later execution? You often create table in database, configure job that checks if due time of any task occured and then execute it.

But there is easier way if only you have message broker with your application… You could publish/send your message and tell it that it should be delivered with specified delay.

Scheduling messages using ActiveMQ

ActiveMQ is open source message broker written in Java. It is implementation of JMS (Java Message Service).

You could start its broker with scheduling support by adding flag schedulerSupport to broker configuration:

<beans ...>
    ...
    <broker xmlns="http://activemq.apache.org/schema/core"
            brokerName="localhost"
            dataDirectory="${activemq.data}"
            schedulerSupport="true">
            ...
    </broker>
    ...
</beans>

 

Now, if you want to delay receiving message by few seconds, you could add property during message creation, e.g.:

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 8000)

Delay unit is miliseconds.

Of course queue must be persisted.

When you listen for message on the same queue, then you will see that message indeed will be received with 8 second delay.

...
Send time: Tue Dec 01 18:51:23 CET 2015
...
Message received at Tue Dec 01 18:51:31 CET 2015
...

Scheduling messages using RabbitMQ

Scheduling tasks is not only the feature of ActiveMQ. It is also available with RabbitMQ.

RabitMQ is message broker written in Erlang. It uses protocol AMQP.

First you have to install plugin rabbitmq_delayed_message_exchange. It could be done via command:

rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

You have to define exchange in RabbitMQ which will use features from this plugin. Queue for delayed messages should be bound to this exchange. Routing key should be set to queue name.

channel.exchangeDeclare(exchange, 'x-delayed-message', true, false, ['x-delayed-type': 'direct']);
channel.queueBind(queue, exchange, queue);
channel.queueDeclare(queue, true, false, false, null);

Of course queue must be persisted.

To test it just publish new message with property x-delay:

channel.basicPublish(
    exchange, 
    queue, 
    new AMQP.BasicProperties.Builder().headers('x-delay': 8000).build(),
    "Message: $currentUuid".bytes
)

Message will be delayed with 8 seconds:

...
Send time: Tue Dec 01 19:04:18 CET 2015
...
Message received at Tue Dec 01 19:04:26 CET 2015
...

Conclusion

Why you create similar mechanism for handling scheduled tasks on your own, when you could use your message brokers and delayed messages to schedule future tasks?

Sources are available here.

You May Also Like