JMS redelivery with ActiveMQ and Servicemix

The other day I felt a compelling need to implement a JMS redelivery scenario. The exact scenario I’d been trying to handle was:

  1. my message is in an ActiveMQ queue or topic
  2. its processing fails, because of some exception – ie. database access exception due to server nonavailability
  3. since we get an exception, the message is not handled properly, we may want to retry processing attempt some time later
  4. of course, for the redelivery to happen we need the message to stay in the ActiveMQ queue – fetching messages from the queue will be stopped until the redelivery succeeds or expires

See how this can be done after the jump :)

For this to happen, I’ve tried implementing Apache Camel route, but as it turns out, Camel fails to deliver facilities for exact JMS redelivery. It is possible to set JMS connection in transacted mode, but the redeliveries happen one after another and fixed times.

What I’ve ended up doing was implement a servicemix-jms endpoint. I’ve used this configuration for it:


            activemq/connectionFactory

            activemq/resourceAdapter

As you can see, we lookup a couple of things in JNDI registry, so you need to have them configured on the Servicemix side – a sample config presented farther in this entry.

The bean responsible for configuring redelivery settings is activationSpec. You can set various things with it, like:

  • initial redelivery delay
  • maximum number of redeliveries
  • backoff multiplier

What is really important in jms:endpoint config for this to work are:

  • processorName=”jca”
  • rollbackOnError=”true”

Servicemix should have the following entries in its jndi registry:

          

(...) 

       xmlns:jencks="http://jencks.org/2.0"
       xmlns:amqra="http://activemq.apache.org/schema/ra" -->

When the redeliveries are exhausted, message is routed to global Dead Letter Queue called ActiveMQ.DLQ. Since this is a single bag for all the failed messages from all queues, you may want to configure this aspect differently. For example you can tell ActiveMQ to create a single DLQ for each queue. Use this config to achieve it – the changes should be made to Broker configuration.


        

            

  ...

More on the subject of redelivieries in ActiveMQ can be found at http://activemq.apache.org/message-redelivery-and-dlq-handling.html.

You May Also Like

Grails session timeout without XML

This article shows clean, non hacky way of configuring featureful event listeners for Grails application servlet context. Feat. HttpSessionListener as a Spring bean example with session timeout depending on whether user account is premium or not.

Common approaches

Speaking of session timeout config in Grails, a default approach is to install templates with a command. This way we got direct access to web.xml file. Also more unnecessary files are created. Despite that unnecessary files are unnecessary, we should also remember some other common knowledge: XML is not for humans.

Another, a bit more hacky, way is to create mysterious scripts/_Events.groovy file. Inside of which, by using not less enigmatic closure: eventWebXmlEnd = { filename -> ... }we can parse and hack into web.xml with a help of XmlSlurper.
Even though lot of Grails plugins do it similar way, still it’s not really straightforward, is it? Besides, where’s the IDE support? Hello!?

Examples of both above ways can be seen on StackOverflow.

Simpler and cleaner way

By adding just a single line to the already generated init closure we have it done:
class BootStrap {

def init = { servletContext ->
servletContext.addListener(OurListenerClass)
}
}

Allrighty, this is enough to avoid XML. Sweets are served after the main course though :)

Listener as a Spring bean

Let us assume we have a requirement. Set a longer session timeout for premium user account.
Users are authenticated upon session creation through SSO.

To easy meet the requirements just instantiate the CustomTimeoutSessionListener as Spring bean at resources.groovy. We also going to need some source of the user custom session timeout. Let say a ConfigService.
beans = {    
customTimeoutSessionListener(CustomTimeoutSessionListener) {
configService = ref('configService')
}
}

With such approach BootStrap.groovy has to by slightly modified. To keep control on listener instantation, instead of passing listener class type, Spring bean is injected by Grails and the instance passed:
class BootStrap {

def customTimeoutSessionListener

def init = { servletContext ->
servletContext.addListener(customTimeoutSessionListener)
}
}

An example CustomTimeoutSessionListener implementation can look like:
import javax.servlet.http.HttpSessionEvent    
import javax.servlet.http.HttpSessionListener
import your.app.ConfigService

class CustomTimeoutSessionListener implements HttpSessionListener {

ConfigService configService

@Override
void sessionCreated(HttpSessionEvent httpSessionEvent) {
httpSessionEvent.session.maxInactiveInterval = configService.sessionTimeoutSeconds
}

@Override
void sessionDestroyed(HttpSessionEvent httpSessionEvent) { /* nothing to implement */ }
}
Having at hand all power of the Spring IoC this is surely a good place to load some persisted user’s account stuff into the session or to notify any other adequate bean about user presence.

Wait, what about the user context?

Honest answer is: that depends on your case. Yet here’s an example of getSessionTimeoutMinutes() implementation using Spring Security:
import org.springframework.security.core.context.SecurityContextHolder    

class ConfigService {

static final int 3H = 3 * 60 * 60
static final int QUARTER = 15 * 60

int getSessionTimeoutSeconds() {

String username = SecurityContextHolder.context?.authentication?.principal
def account = Account.findByUsername(username)

return account?.premium ? 3H : QUARTER
}
}
This example is simplified. Does not contain much of defensive programming. Just an assumption that principal is already set and is a String - unique username. Thanks to Grails convention our ConfigService is transactional so the Account domain class can use GORM dynamic finder.
OK, config fetching implementation details are out of scope here anyway. You can get, load, fetch, obtain from wherever you like to. Domain persistence, principal object, role config, external file and so on...

Any gotchas?

There is one. When running grails test command, servletContext comes as some mocked class instance without addListener method. Thus we going to have a MissingMethodException when running tests :(

Solution is typical:
def init = { servletContext ->
if (Environment.current != Environment.TEST) {
servletContext.addListener(customTimeoutSessionListener)
}
}
An unnecessary obstacle if you ask me. Should I submit a Jira issue about that?

TL;DR

Just implement a HttpSessionListener. Create a Spring bean of the listener. Inject it into BootStrap.groovy and call servletContext.addListener(injectedListener).