Complex flows with Apache Camel

At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use Apache Camel for this task, which is a Swis…At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use Apache Camel for this task, which is a Swis…

At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use

Apache Camel for this task, which is a Swiss-knife for integration engineer. What’s more, this tools corresponds well with our approach to integration solutions:
* try to operate on XML messages, so you get the advantage of XPaths, XSL and other benefits,
* don’t convert XML into Java classes back and forth and be worried with problems like XML conversion,
* try to get a simple flow of the process. However, at first sight Apache Camel seems to have some drawbacks mainly in the area of practical solutions ;-). It’s very handy tool if you need to use it as a pipeline with some marginal processing of the data that passes through it. It gets a lot harder to wrap your head around if you consider some branching and intermediate calls to external services. This may be tricky to write properly in Camel’s DSL. Here is a simple pipeline example:

And here the exact scenario we’re discussing: What I’d like to show is the solution to this problem. Well, if you’re using a recent version of Camel this may be easier, a little different, but should still more-or-less work this way. This code is written for Apache Camel 1.4 – a rather antic version, but that’s what we’re forced to use. Oh, well. Ok, enough whining! So, I create a test class to illustrate the case. The route defined in TestRouter class is responsible for:
1. receiving input
2. setting exchange property to a given xpath, which effectively is the name of the first XML element in the input stream
3. than, the input data is sent to three different external services, each of them replies with some fictional data – notice routes a, b and c. The SimpleContentSetter processor is just for responding with a given text.
4. the response from all three services is somehow processed by RequestEnricher bean, which is described below
5. eventually the exchange is logged in specified category Here is some code for this:

public class SimpleTest {
    public void setUp() throws Exception {
        TestRouter tr = new TestRouter();
        ctx.addRoutes(tr);
    }

    @Test
    public void shouldCheck() throws Exception {
        ctx.createProducerTemplate().send("direct:in", getInOut(""));
    }

    class TestRouter extends RouteBuilder {

        public void configure() throws Exception {

            ((ProcessorType) from("direct:in")
                .setProperty("operation").xpath("local-name(/*)", String.class)
                .multicast(new MergeAggregationStrategy())
                .to("direct:a", "direct:b", "direct:c")
                .end()
                .setBody().simple("${in.body}"))
            .bean(RequestEnricher.class, "enrich")
                .to("log:pl.touk.debug");

            from("direct:a").process(new SimpleContentSetter(""));
            from("direct:b").process(new SimpleContentSetter(""));
            from("direct:c").process(new SimpleContentSetter(""));
        }
    }
}

What’s unusual in this code is the fact, that what normally Camel does when you write a piece of DSL like:

.to("direct:a", "direct:b", "direct:c")

is pass input to service

a, than a‘s output gets passed to b, becomes it’s input, than b‘s output becomes c‘s input. The problem being, you loose the output from a and b, not mentioning that you might want to send the same input to all three services. That’s where a little tool called multicast() comes in handy. It offers you the ability to aggregate the outputs of those services. You may even create an AggregationStrategy that will do it the way you like. Below class, MergeAggregationStrategy does exactly that kind of work – it joins outputs from all three services. A lot of info about proper use of AggregationStrategy-ies can be found in this post by Torsten Mielke.

public class MergeAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange.isFailed()) {
            return oldExchange;
        }
        transformMessage(oldExchange.getIn(), newExchange.getIn());
        transformMessage(oldExchange.getOut(), newExchange.getOut());
        return newExchange;
    }

    private void transformMessage(Message oldM, Message newM) {
        String oldBody = oldM.getBody(String.class);
        String newBody = newM.getBody(String.class);
        newM.setBody(oldBody + newBody);
    }

}

However nice this may look (or not), what you’re left with is a mix of multiple XMLs. Normally this won’t do you much good. Better thing to do is to parse this output in some way. What we’re using for this is a Groovy :). Which is great for the task of parsing XML. A lot less verbose than ordinary Java. Let’s assume a scenario, that the aggregated output, currently looking like this:


is to be processed with the following steps in mind:

  • use ** as the result element
  • use attributes param1, param2, param3 from element ** and add it to result element **
public class RequestEnricher {

    public String enrich(@Property(name = "operation") String operation, Exchange ex) {

        use(DOMCategory) {
            def dhl = new groovy.xml.Namespace("http://example.com/common/dhl/schema", 'dhl')
            def pc = new groovy.xml.Namespace("http://example.com/pc/types", 'pc')
            def doc = new XmlParser().parseText(ex.in.body)

            def pcRequest = doc.
            "aaaa" [0]

            ["param1", "param2", "param3"].each() {
                def node = doc.
                '**' [("" + it)][0]
                if (node)
                    pcRequest['@' + it] = node.text()
            }

            gNodeListToString([pcRequest])
        }

    }

    String gNodeListToString(list) {
        StringBuilder sb = new StringBuilder();
        list.each {
            listItem ->
                StringWriter sw = new StringWriter();
            new XmlNodePrinter(new PrintWriter(sw)).print(listItem)
            sb.append(sw.toString());
        }
        return sb.toString();
    }

}

 

What we’re doing here, especially the last line of

enrich method is the conversion to String. There are some problems for Camel if we spit out Groovy objects. The rest is just some Groovy specific ways of manipulating XML. But looking into enrich method’s parameters, there is @Property annotation used, which binds the property assigned earlier in a router code to one of the arguments. That is really cool feature and there are more such annotations:
* @XPath
* @Header
* @Headers and @Properties – gives whole maps of properties or headers This pretty much concludes the subject :) Have fun, and if in doubt, leave a comment with your question!

You May Also Like

JBoss Envers and Spring transaction managers

I've stumbled upon a bug with my configuration for JBoss Envers today, despite having integration tests all over the application. I have to admit, it casted a dark shadow of doubt about the value of all the tests for a moment. I've been practicing TDD since 2005, and frankly speaking, I should have been smarter than that.

My fault was simple. I've started using Envers the right way, with exploratory tests and a prototype. Then I've deleted the prototype and created some integration tests using in-memory H2 that looked more or less like this example:

@Test
public void savingAndUpdatingPersonShouldCreateTwoHistoricalVersions() {
    //given
    Person person = createAndSavePerson();
    String oldFirstName = person.getFirstName();
    String newFirstName = oldFirstName + "NEW";

    //when
    updatePersonWithNewName(person, newFirstName);

    //then
    verifyTwoHistoricalVersionsWereSaved(oldFirstName, newFirstName);
}

private Person createAndSavePerson() {
    Transaction transaction = session.beginTransaction();
    Person person = PersonFactory.createPerson();
    session.save(person);
    transaction.commit();
    return person;
}    

private void updatePersonWithNewName(Person person, String newName) {
    Transaction transaction = session.beginTransaction();
    person.setFirstName(newName);
    session.update(person);
    transaction.commit();
}

private void verifyTwoHistoricalVersionsWereSaved(String oldFirstName, String newFirstName) {
    List<Object[]> personRevisions = getPersonRevisions();
    assertEquals(2, personRevisions.size());
    assertEquals(oldFirstName, ((Person)personRevisions.get(0)[0]).getFirstName());
    assertEquals(newFirstName, ((Person)personRevisions.get(1)[0]).getFirstName());
}

private List<Object[]> getPersonRevisions() {
    Transaction transaction = session.beginTransaction();
    AuditReader auditReader = AuditReaderFactory.get(session);
    List<Object[]> personRevisions = auditReader.createQuery()
            .forRevisionsOfEntity(Person.class, false, true)
            .getResultList();
    transaction.commit();
    return personRevisions;
}

Because Envers inserts audit data when the transaction is commited (in a new temporary session), I thought I have to create and commit the transaction manually. And that is true to some point.

My fault was that I didn't have an end-to-end integration/acceptance test, that would call to entry point of the application (in this case a service which is called by GWT via RPC), because then I'd notice, that the Spring @Transactional annotation, and calling transaction.commit() are two, very different things.

Spring @Transactional annotation will use a transaction manager configured for the application. Envers on the other hand is used by subscribing a listener to hibernate's SessionFactory like this:

<bean id="sessionFactory" class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean" >        
...
 <property name="eventListeners">
     <map key-type="java.lang.String" value-type="org.hibernate.event.EventListeners">
         <entry key="post-insert" value-ref="auditEventListener"/>
         <entry key="post-update" value-ref="auditEventListener"/>
         <entry key="post-delete" value-ref="auditEventListener"/>
         <entry key="pre-collection-update" value-ref="auditEventListener"/>
         <entry key="pre-collection-remove" value-ref="auditEventListener"/>
         <entry key="post-collection-recreate" value-ref="auditEventListener"/>
     </map>
 </property>
</bean>

<bean id="auditEventListener" class="org.hibernate.envers.event.AuditEventListener" />

Envers creates and collects something called AuditWorkUnits whenever you update/delete/insert audited entities, but audit tables are not populated until something calls AuditProcess.beforeCompletion, which makes sense. If you are using org.hibernate.transaction.JDBCTransaction manually, this is called on commit() when notifying all subscribed javax.transaction.Synchronization objects (and enver's AuditProcess is one of them).

The problem was, that I used a wrong transaction manager.

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
    <property name="dataSource" ref="dataSource"/>
</bean>

This transaction manager doesn't know anything about hibernate and doesn't use org.hibernate.transaction.JDBCTransaction. While Synchronization is an interface from javax.transaction package, DataSourceTransactionManager doesn't use it (maybe because of simplicity, I didn't dig deep enough in org.springframework.jdbc.datasource), and thus Envers works fine except not pushing the data to the database.

Which is the whole point of using Envers.

Use right tools for the task, they say. The whole problem is solved by using a transaction manager that is well aware of hibernate underneath.

<bean id="transactionManager" class="org.springframework.orm.hibernate3.HibernateTransactionManager" >
    <property name="sessionFactory" ref="sessionFactory"/>
</bean>

Lesson learned: always make sure your acceptance tests are testing the right thing. If there is a doubt about the value of your tests, you just don't have enough of them,