Advisory Messages to the rescue

The most crucial part of software development is testing. It should ensure us, that our code is correct, works according to given specs, etc. There are many kinds of tests: unit tests, integration, functional. In general you should try to test the smallest possible subset of your code and be able to check the state of the objects after the test.

This seems as rather easy task, but what if you have an integration end-to-end test to perform? In most cases asserting state in integration test is rather hard due to multiple systems interoperability. Let’s focus on a specific situation.

What I needed to do the other day was write some integration test for Jms based system. The processing pipeline is easy:

  • fetch object from DB
  • process it
  • publish on JMS

some other system (X-system) polls JMS:

  • if message is found
  • fetch it (message disappears from the JMS queue)
  • do sth with it
  • Looks simple but since I didn’t have any sane access to the X-system I wanted to be sure that my object was actually put into the queue. It was not acceptable to subscribe to the queue and fetch that object in my test – it would dusrupt the flow of the whole process.

    Fortunately I’ve been using ActiveMQ and since it offers a thing called Advisory Messages I’ve decided to use just them.

    What are advisory messages? They are a set of administrative messages that are generated on a specific event, like message consumption, message delivery, topic destruction, and many more. Each type of message is delivered to a separate topic – prefixed with ActiveMQ.Advisory. Since generation of such messages may be an overhead in production systems these features are turned off by default. You need to enable specific type of advisory message for a specific jms destination. You can do this with ths configuration change to activemq.xml

    <destinationPolicy>
       <policyMap>
          <policyEntries>
            <policyEntry queue="my/test/queue" advisoryForDelivery="true" advisoryForConsumed="true"/>                                                   
            <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
              <pendingSubscriberPolicy>
                <vmCursor />
              </pendingSubscriberPolicy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>
    

    As you can see, I’ve specified which advisories I want enabled. The full list of available advisories can be found here.

    Since I wanted to read messages from that topic I’ve added the following configuration to my spring context – there is one destination bean for inserting messages and one bean for advisory topic.

    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue" autowire="constructor">
        <constructor-arg value="my/test/queue" />
    </bean>
    
    <bean id="deliveredToTestQueueAdvisory" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor">
        <constructor-arg value="ActiveMQ.Advisory.MessageDelivered.Queue.my/test/queue" />
    </bean>
    

    Thanks to this configuration I’ve been able to check that my message was actually delivered to the queue. There’ve been no need to worry about race conditions in consuming the message from original queue – if the X-system read the message, I’d be unable to determine if it has ever been in JMS at all.

    What’s not so nice about that:

    • advisory messages can be thought of as counters rather than debugging information
    • they don’t contain any data that would allow us to match advisory message to the original message – thou you could correlate by timestamp

    All in all, it’s a good tool to have! But perhaps you have some other thoughts on this subject? How do you test JMS?

You May Also Like

JBoss Envers and Spring transaction managers

I've stumbled upon a bug with my configuration for JBoss Envers today, despite having integration tests all over the application. I have to admit, it casted a dark shadow of doubt about the value of all the tests for a moment. I've been practicing TDD since 2005, and frankly speaking, I should have been smarter than that.

My fault was simple. I've started using Envers the right way, with exploratory tests and a prototype. Then I've deleted the prototype and created some integration tests using in-memory H2 that looked more or less like this example:

@Test
public void savingAndUpdatingPersonShouldCreateTwoHistoricalVersions() {
    //given
    Person person = createAndSavePerson();
    String oldFirstName = person.getFirstName();
    String newFirstName = oldFirstName + "NEW";

    //when
    updatePersonWithNewName(person, newFirstName);

    //then
    verifyTwoHistoricalVersionsWereSaved(oldFirstName, newFirstName);
}

private Person createAndSavePerson() {
    Transaction transaction = session.beginTransaction();
    Person person = PersonFactory.createPerson();
    session.save(person);
    transaction.commit();
    return person;
}    

private void updatePersonWithNewName(Person person, String newName) {
    Transaction transaction = session.beginTransaction();
    person.setFirstName(newName);
    session.update(person);
    transaction.commit();
}

private void verifyTwoHistoricalVersionsWereSaved(String oldFirstName, String newFirstName) {
    List<Object[]> personRevisions = getPersonRevisions();
    assertEquals(2, personRevisions.size());
    assertEquals(oldFirstName, ((Person)personRevisions.get(0)[0]).getFirstName());
    assertEquals(newFirstName, ((Person)personRevisions.get(1)[0]).getFirstName());
}

private List<Object[]> getPersonRevisions() {
    Transaction transaction = session.beginTransaction();
    AuditReader auditReader = AuditReaderFactory.get(session);
    List<Object[]> personRevisions = auditReader.createQuery()
            .forRevisionsOfEntity(Person.class, false, true)
            .getResultList();
    transaction.commit();
    return personRevisions;
}

Because Envers inserts audit data when the transaction is commited (in a new temporary session), I thought I have to create and commit the transaction manually. And that is true to some point.

My fault was that I didn't have an end-to-end integration/acceptance test, that would call to entry point of the application (in this case a service which is called by GWT via RPC), because then I'd notice, that the Spring @Transactional annotation, and calling transaction.commit() are two, very different things.

Spring @Transactional annotation will use a transaction manager configured for the application. Envers on the other hand is used by subscribing a listener to hibernate's SessionFactory like this:

<bean id="sessionFactory" class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean" >        
...
 <property name="eventListeners">
     <map key-type="java.lang.String" value-type="org.hibernate.event.EventListeners">
         <entry key="post-insert" value-ref="auditEventListener"/>
         <entry key="post-update" value-ref="auditEventListener"/>
         <entry key="post-delete" value-ref="auditEventListener"/>
         <entry key="pre-collection-update" value-ref="auditEventListener"/>
         <entry key="pre-collection-remove" value-ref="auditEventListener"/>
         <entry key="post-collection-recreate" value-ref="auditEventListener"/>
     </map>
 </property>
</bean>

<bean id="auditEventListener" class="org.hibernate.envers.event.AuditEventListener" />

Envers creates and collects something called AuditWorkUnits whenever you update/delete/insert audited entities, but audit tables are not populated until something calls AuditProcess.beforeCompletion, which makes sense. If you are using org.hibernate.transaction.JDBCTransaction manually, this is called on commit() when notifying all subscribed javax.transaction.Synchronization objects (and enver's AuditProcess is one of them).

The problem was, that I used a wrong transaction manager.

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
    <property name="dataSource" ref="dataSource"/>
</bean>

This transaction manager doesn't know anything about hibernate and doesn't use org.hibernate.transaction.JDBCTransaction. While Synchronization is an interface from javax.transaction package, DataSourceTransactionManager doesn't use it (maybe because of simplicity, I didn't dig deep enough in org.springframework.jdbc.datasource), and thus Envers works fine except not pushing the data to the database.

Which is the whole point of using Envers.

Use right tools for the task, they say. The whole problem is solved by using a transaction manager that is well aware of hibernate underneath.

<bean id="transactionManager" class="org.springframework.orm.hibernate3.HibernateTransactionManager" >
    <property name="sessionFactory" ref="sessionFactory"/>
</bean>

Lesson learned: always make sure your acceptance tests are testing the right thing. If there is a doubt about the value of your tests, you just don't have enough of them,