JMS redelivery with ActiveMQ and Servicemix

The other day I felt a compelling need to implement a JMS redelivery scenario. The exact scenario I’d been trying to handle was:

  1. my message is in an ActiveMQ queue or topic
  2. its processing fails, because of some exception – ie. database access exception due to server nonavailability
  3. since we get an exception, the message is not handled properly, we may want to retry processing attempt some time later
  4. of course, for the redelivery to happen we need the message to stay in the ActiveMQ queue – fetching messages from the queue will be stopped until the redelivery succeeds or expires

See how this can be done after the jump :)

For this to happen, I’ve tried implementing Apache Camel route, but as it turns out, Camel fails to deliver facilities for exact JMS redelivery. It is possible to set JMS connection in transacted mode, but the redeliveries happen one after another and fixed times.

What I’ve ended up doing was implement a servicemix-jms endpoint. I’ve used this configuration for it:


            activemq/connectionFactory

            activemq/resourceAdapter

As you can see, we lookup a couple of things in JNDI registry, so you need to have them configured on the Servicemix side – a sample config presented farther in this entry.

The bean responsible for configuring redelivery settings is activationSpec. You can set various things with it, like:

  • initial redelivery delay
  • maximum number of redeliveries
  • backoff multiplier

What is really important in jms:endpoint config for this to work are:

  • processorName=”jca”
  • rollbackOnError=”true”

Servicemix should have the following entries in its jndi registry:

          

(...) 

       xmlns:jencks="http://jencks.org/2.0"
       xmlns:amqra="http://activemq.apache.org/schema/ra" -->

When the redeliveries are exhausted, message is routed to global Dead Letter Queue called ActiveMQ.DLQ. Since this is a single bag for all the failed messages from all queues, you may want to configure this aspect differently. For example you can tell ActiveMQ to create a single DLQ for each queue. Use this config to achieve it – the changes should be made to Broker configuration.


        

            

  ...

More on the subject of redelivieries in ActiveMQ can be found at http://activemq.apache.org/message-redelivery-and-dlq-handling.html.

You May Also Like

Log4j and MDC in Grails

Log4j provides very useful feature: MDC - mapped diagnostic context. It can be used to store data in context of current thread. It may sound scary a bit but idea is simple.

My post is based on post http://burtbeckwith.com/blog/?p=521 from Burt Beckwith's excellent blog, it's definitely worth checking if you are interested in Grails.

Short background story...


Suppose we want to do logging our brand new shopping system and we want to have in each log customer's shopping basket number. And our system can be used at once by many users who can perform many transactions, actions like adding items and so on. How can we achieve that? Of course we can add basket number in every place where we do some logging but this task would be boring and error-prone. 

Instead of this we can use MDC to store variable with basket number in map. 

In fact MDC can be treated as map of custom values for current thread that can be used by logger. 


How to do that with Grails?


Using MDC with Grails is quite simple. All we need to do is to create our own custom filter which works for given urls and puts our data in MDC.

Filters in Grails are classes in directory grails-app/conf/* which names end with *Filters.groovy postfix. We can create this class manually or use Grails command: 
grails create-filters info.rnowak.App.Basket

In result class named BasketFilters will be created in grails-app/conf/info/rnowak/UberApp.

Initially filter class looks a little bit empty:
class BasketFilters {
def filters = {
all(controller:'*', action:'*') {
before = {

}
after = { Map model ->

}
afterView = { Exception e ->

}
}
}
}
All we need to do is fill empty closures, modify filter properties and put some data into MDC.

all is the general name of our filter, as class BasketFilters (plural!) can contain many various filters. You can name it whatever you want, for this post let assume it will be named basketFilter

Another thing is change of filter parameters. According to official documentation (link) we can customize our filter in many ways. You can specify controller to be filtered, its actions, filtered urls and so on. In our example you can stay with default option where filter is applied to every action of every controller. If you are interested in filtering only some urls, use uri parameter with expression describing desired urls to be filtered.

Three closures that are already defined in template have their function and they are started in these conditions:

  • before - as name says, it is executed before filtered action takes place
  • after - similarly, it is called after the action
  • afterView - called after rendering of the actions view
Ok, so now we know what are these mysterious methods and when they are called. But what can be done within them? In official Grails docs (link again) under section 7.6.3 there is a list of properties that are available to use in filter.

With that knowledge, we can proceed to implementing filter.

Putting something into MDC in filter


What we want to do is quite easy: we want to retrieve basket number from parameters and put it into MDC in our filter:
class BasketFilters {
def filters = {
basketFilter(controller:'*', action:'*') {
before = {
MDC.put("basketNumber", params.basketNumber ?: "")
}
after = { Map model ->
MDC.remove("basketNumber")
}
}
}
}

We retrieve basket number from Grails params map and then we put in map under specified key ("basketNumber" in this case), which will be later used in logger conversion pattern. It is important to remove custom value after processing of action to avoid leaks.

So we are putting something into MDC. But how make use of it in logs?


We can refer to custom data in MDC in conversion patter using syntax: %X{key}, where key is our key we used in filter to put data, like:
def conversionPattern = "%d{yyyy-MM-dd HH:mm:ss} %-5p %t [%c{1}] %X{basketNumber} - %m%n"


And that's it :) We've put custom data in log4j MDC and successfully used it in logs to display interesting values.

Grails with Spock unit test + IntelliJ IDEA = No thread-bound request found

During my work with Grails project using Spock test in IntelliJ IDEA I've encountered this error:

java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131)
at org.codehaus.groovy.grails.plugins.web.api.CommonWebApi.currentRequestAttributes(CommonWebApi.java:205)
at org.codehaus.groovy.grails.plugins.web.api.CommonWebApi.getParams(CommonWebApi.java:65)
... // and few more lines of stacktrace ;)

It occurred when I tried to debug one of test from IDEA level. What is interesting, this error does not happen when I'm running all test using grails test-app for instance.

So what was the issue? With little of reading and tip from Tomek Kalkosiński (http://refaktor.blogspot.com/) it turned out that our test was missing @TestFor annotation and adding it solved all problems.

This annotation, according to Grails docs (link), indicates Spock what class is being tested and implicitly creates field with given type in test class. It is somehow strange as problematic test had explicitly and "manually" created field with proper controller type. Maybe there is a problem with mocking servlet requests?