Scheduling tasks using Message Queue

Introduction

How to schedule your task for later execution? You often create table in database, configure job that checks if due time of any task occured and then execute it.

But there is easier way if only you have message broker with your application... You could publish/send your message and tell it that it should be delivered with specified delay.

Scheduling messages using ActiveMQ

ActiveMQ is open source message broker written in Java. It is implementation of JMS (Java Message Service).

You could start its broker with scheduling support by adding flag schedulerSupport to broker configuration:

<beans ...>
...
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.data}"
schedulerSupport="true">
...
</broker>
...
</beans>

Now, if you want to delay receiving message by few seconds, you could add property during message creation, e.g.:

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 8000)

Delay unit is miliseconds.

Of course queue must be persisted.

When you listen for message on the same queue, then you will see that message indeed will be received with 8 second delay.

...
Send time: Tue Dec 01 18:51:23 CET 2015
...
Message received at Tue Dec 01 18:51:31 CET 2015
...

Scheduling messages using RabbitMQ

Scheduling tasks is not only the feature of ActiveMQ. It is also available with RabbitMQ.

RabitMQ is message broker written in Erlang. It uses protocol AMQP.

First you have to install plugin rabbitmq_delayed_message_exchange. It could be done via command:

rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

You have to define exchange in RabbitMQ which will use features from this plugin. Queue for delayed messages should be bound to this exchange. Routing key should be set to queue name.

channel.exchangeDeclare(exchange, 'x-delayed-message', true, false, ['x-delayed-type': 'direct']);
channel.queueBind(queue, exchange, queue);
channel.queueDeclare(queue, true, false, false, null);

Of course queue must be persisted.

To test it just publish new message with property x-delay:

channel.basicPublish(exchange,
queue,
new AMQP.BasicProperties.Builder().headers('x-delay': 8000).build(),
"Message: $currentUuid".bytes)

Message will be delayed with 8 seconds:

...
Send time: Tue Dec 01 19:04:18 CET 2015
...
Message received at Tue Dec 01 19:04:26 CET 2015
...

Conclusion

Why you create similar mechanism for handling scheduled tasks on your own, when you could use your message brokers and delayed messages to schedule future tasks?

Sources are available here.

Advisory Messages to the rescue

The most crucial part of software development is testing. It should ensure us, that our code is correct, works according to given specs, etc. There are many kinds of tests: unit tests, integration, functional. In general you should try to test the smallest possible subset of your code and be able to check the state of the objects after the test. This seems as rather easy task, but what if you have an integration end-to-end test to perform? In most cases asserting state in integration test is rather hard due to multiple systems interoperability. Let's focus on a specific situation. What I needed to do the other day was write some integration test for Jms based system. The processing pipeline is easy:
  • fetch object from DB
  • process it
  • publish on JMS
some other system (X-system) polls JMS:
  • if message is found
  • fetch it (message disappears from the JMS queue)
  • do sth with it
  • Looks simple but since I didn't have any sane access to the X-system I wanted to be sure that my object was actually put into the queue. It was not acceptable to subscribe to the queue and fetch that object in my test - it would dusrupt the flow of the whole process. Fortunately I've been using ActiveMQ and since it offers a thing called Advisory Messages I've decided to use just them. What are advisory messages? They are a set of administrative messages that are generated on a specific event, like message consumption, message delivery, topic destruction, and many more. Each type of message is delivered to a separate topic - prefixed with ActiveMQ.Advisory. Since generation of such messages may be an overhead in production systems these features are turned off by default. You need to enable specific type of advisory message for a specific jms destination. You can do this with ths configuration change to activemq.xml   As you can see, I've specified which advisories I want enabled. The full list of available advisories can be found here. Since I wanted to read messages from that topic I've added the following configuration to my spring context - there is one destination bean for inserting messages and one bean for advisory topic.   Thanks to this configuration I've been able to check that my message was actually delivered to the queue. There've been no need to worry about race conditions in consuming the message from original queue - if the X-system read the message, I'd be unable to determine if it has ever been in JMS at all. What's not so nice about that:
    • advisory messages can be thought of as counters rather than debugging information
    • they don't contain any data that would allow us to match advisory message to the original message - thou you could correlate by timestamp
    All in all, it's a good tool to have! But perhaps you have some other thoughts on this subject? How do you test JMS?

JMS redelivery with ActiveMQ and Servicemix

The other day I felt a compelling need to implement a JMS redelivery scenario. The exact scenario I'd been trying to handle was:
  1. my message is in an ActiveMQ queue or topic
  2. its processing fails, because of some exception - ie. database access exception due to server nonavailability
  3. since we get an exception, the message is not handled properly, we may want to retry processing attempt some time later
  4. of course, for the redelivery to happen we need the message to stay in the ActiveMQ queue - fetching messages from the queue will be stopped until the redelivery succeeds or expires
See how this can be done after the jump :)

Easier and nicer JMS

JMS seems like a hostile ground. It has all it's quirks and strange behaviours. A couple of defining standards plus esoteric brokers, queues and topics.

At work, we mainly use open source Jms solutions, namely Apache ActiveMQ. This one is usually bundled with Apache Servicemix, as a message broker for this particular ESB. As there are some minor caveats in this scennerio, I'd like to describe here some guidelines for getting to running JMS queues.

Treat this post as a quick cheat sheet with the most common things about JMS I tend to forget :)

Minor glitches encountered during work with embedded broker led to some thoughts about switching to external broker. This is how I configure SMX and AcviteMQ.

Necessary steps:

  • change apache-servicemix/conf/servicemix.properties activemq.port to sth else than standard, for example 61626
  • change apache-activemq/conf/activemq.xml with this settings:
    • change port, the service listens on:
              
                  
              
      
    • setup separate JMX instance:
              
                  
              
      
  • the nicest tool I found for browsing queues and topics is Hermes JMS. Sample config, that connects Hermes to ActiveMQ instance is on the picture below: HermesJMS to ActiveMQ connection config
  • sending simple messages with Hermes is basic, but what if you need to set some headers, send bulk messages, etc. Easy, just use Hermes xml format. Look like this code snippet below and is rather self-explanatory:
    
        
            
                
                
    
      
        
          105
          1235
        
      
    ]]>
    
            
        
    
    
  • since we use lots of Apache Camel to consume messages, here is a simple way to start broker in your tests:
    • start a broker
              BrokerService broker = new org.apache.activemq.broker.BrokerService();
              broker.setBrokerName("AMQ-1");
              broker.addConnector("tcp://localhost:51616");
              broker.setPersistent(false);
              broker.start();
      

      Notice it has persistance disabled.

    • initialize Camel's JMS component:
          ctx.removeComponent("jms");
          ctx.addComponent("jms", ActiveMQComponent.activeMQComponent("tcp://localhost:51616"));
      
    • if you want to pass messages to reference endpoints, (like ref:input), use this wrapper method:
      private JmsEndpoint createJmsEndpoint(String endpoint) throws JMSException {
              ActiveMQComponent amqc = (ActiveMQComponent) ctx.getComponent("jms");
              JmsEndpoint endp = JmsEndpoint.newInstance(new ActiveMQTopic(endpoint), amqc);
              return endp;
      }
      
      createJmsEndpoint("ESB/XYZ")
      

These are all the tricks I've got for now! But if you know some other good tools that handle JMS, feel free to comment! Got more advices, again, comment!