JMS redelivery with ActiveMQ and Servicemix

The other day I felt a compelling need to implement a JMS redelivery scenario. The exact scenario I’d been trying to handle was:

  1. my message is in an ActiveMQ queue or topic
  2. its processing fails, because of some exception – ie. database access exception due to server nonavailability
  3. since we get an exception, the message is not handled properly, we may want to retry processing attempt some time later
  4. of course, for the redelivery to happen we need the message to stay in the ActiveMQ queue – fetching messages from the queue will be stopped until the redelivery succeeds or expires

See how this can be done after the jump :)

For this to happen, I’ve tried implementing Apache Camel route, but as it turns out, Camel fails to deliver facilities for exact JMS redelivery. It is possible to set JMS connection in transacted mode, but the redeliveries happen one after another and fixed times.

What I’ve ended up doing was implement a servicemix-jms endpoint. I’ve used this configuration for it:


            activemq/connectionFactory

            activemq/resourceAdapter

As you can see, we lookup a couple of things in JNDI registry, so you need to have them configured on the Servicemix side – a sample config presented farther in this entry.

The bean responsible for configuring redelivery settings is activationSpec. You can set various things with it, like:

  • initial redelivery delay
  • maximum number of redeliveries
  • backoff multiplier

What is really important in jms:endpoint config for this to work are:

  • processorName=”jca”
  • rollbackOnError=”true”

Servicemix should have the following entries in its jndi registry:

          

(...) 

       xmlns:jencks="http://jencks.org/2.0"
       xmlns:amqra="http://activemq.apache.org/schema/ra" -->

When the redeliveries are exhausted, message is routed to global Dead Letter Queue called ActiveMQ.DLQ. Since this is a single bag for all the failed messages from all queues, you may want to configure this aspect differently. For example you can tell ActiveMQ to create a single DLQ for each queue. Use this config to achieve it – the changes should be made to Broker configuration.


        

            

  ...

More on the subject of redelivieries in ActiveMQ can be found at http://activemq.apache.org/message-redelivery-and-dlq-handling.html.

You May Also Like

JBoss Envers and Spring transaction managers

I've stumbled upon a bug with my configuration for JBoss Envers today, despite having integration tests all over the application. I have to admit, it casted a dark shadow of doubt about the value of all the tests for a moment. I've been practicing TDD since 2005, and frankly speaking, I should have been smarter than that.

My fault was simple. I've started using Envers the right way, with exploratory tests and a prototype. Then I've deleted the prototype and created some integration tests using in-memory H2 that looked more or less like this example:

@Test
public void savingAndUpdatingPersonShouldCreateTwoHistoricalVersions() {
    //given
    Person person = createAndSavePerson();
    String oldFirstName = person.getFirstName();
    String newFirstName = oldFirstName + "NEW";

    //when
    updatePersonWithNewName(person, newFirstName);

    //then
    verifyTwoHistoricalVersionsWereSaved(oldFirstName, newFirstName);
}

private Person createAndSavePerson() {
    Transaction transaction = session.beginTransaction();
    Person person = PersonFactory.createPerson();
    session.save(person);
    transaction.commit();
    return person;
}    

private void updatePersonWithNewName(Person person, String newName) {
    Transaction transaction = session.beginTransaction();
    person.setFirstName(newName);
    session.update(person);
    transaction.commit();
}

private void verifyTwoHistoricalVersionsWereSaved(String oldFirstName, String newFirstName) {
    List<Object[]> personRevisions = getPersonRevisions();
    assertEquals(2, personRevisions.size());
    assertEquals(oldFirstName, ((Person)personRevisions.get(0)[0]).getFirstName());
    assertEquals(newFirstName, ((Person)personRevisions.get(1)[0]).getFirstName());
}

private List<Object[]> getPersonRevisions() {
    Transaction transaction = session.beginTransaction();
    AuditReader auditReader = AuditReaderFactory.get(session);
    List<Object[]> personRevisions = auditReader.createQuery()
            .forRevisionsOfEntity(Person.class, false, true)
            .getResultList();
    transaction.commit();
    return personRevisions;
}

Because Envers inserts audit data when the transaction is commited (in a new temporary session), I thought I have to create and commit the transaction manually. And that is true to some point.

My fault was that I didn't have an end-to-end integration/acceptance test, that would call to entry point of the application (in this case a service which is called by GWT via RPC), because then I'd notice, that the Spring @Transactional annotation, and calling transaction.commit() are two, very different things.

Spring @Transactional annotation will use a transaction manager configured for the application. Envers on the other hand is used by subscribing a listener to hibernate's SessionFactory like this:

<bean id="sessionFactory" class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean" >        
...
 <property name="eventListeners">
     <map key-type="java.lang.String" value-type="org.hibernate.event.EventListeners">
         <entry key="post-insert" value-ref="auditEventListener"/>
         <entry key="post-update" value-ref="auditEventListener"/>
         <entry key="post-delete" value-ref="auditEventListener"/>
         <entry key="pre-collection-update" value-ref="auditEventListener"/>
         <entry key="pre-collection-remove" value-ref="auditEventListener"/>
         <entry key="post-collection-recreate" value-ref="auditEventListener"/>
     </map>
 </property>
</bean>

<bean id="auditEventListener" class="org.hibernate.envers.event.AuditEventListener" />

Envers creates and collects something called AuditWorkUnits whenever you update/delete/insert audited entities, but audit tables are not populated until something calls AuditProcess.beforeCompletion, which makes sense. If you are using org.hibernate.transaction.JDBCTransaction manually, this is called on commit() when notifying all subscribed javax.transaction.Synchronization objects (and enver's AuditProcess is one of them).

The problem was, that I used a wrong transaction manager.

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
    <property name="dataSource" ref="dataSource"/>
</bean>

This transaction manager doesn't know anything about hibernate and doesn't use org.hibernate.transaction.JDBCTransaction. While Synchronization is an interface from javax.transaction package, DataSourceTransactionManager doesn't use it (maybe because of simplicity, I didn't dig deep enough in org.springframework.jdbc.datasource), and thus Envers works fine except not pushing the data to the database.

Which is the whole point of using Envers.

Use right tools for the task, they say. The whole problem is solved by using a transaction manager that is well aware of hibernate underneath.

<bean id="transactionManager" class="org.springframework.orm.hibernate3.HibernateTransactionManager" >
    <property name="sessionFactory" ref="sessionFactory"/>
</bean>

Lesson learned: always make sure your acceptance tests are testing the right thing. If there is a doubt about the value of your tests, you just don't have enough of them,