Complex flows with Apache Camel

At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use Apache Camel for this task, which is a Swis…At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use Apache Camel for this task, which is a Swis…

At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use

Apache Camel for this task, which is a Swiss-knife for integration engineer. What’s more, this tools corresponds well with our approach to integration solutions:
* try to operate on XML messages, so you get the advantage of XPaths, XSL and other benefits,
* don’t convert XML into Java classes back and forth and be worried with problems like XML conversion,
* try to get a simple flow of the process. However, at first sight Apache Camel seems to have some drawbacks mainly in the area of practical solutions ;-). It’s very handy tool if you need to use it as a pipeline with some marginal processing of the data that passes through it. It gets a lot harder to wrap your head around if you consider some branching and intermediate calls to external services. This may be tricky to write properly in Camel’s DSL. Here is a simple pipeline example:

And here the exact scenario we’re discussing: What I’d like to show is the solution to this problem. Well, if you’re using a recent version of Camel this may be easier, a little different, but should still more-or-less work this way. This code is written for Apache Camel 1.4 – a rather antic version, but that’s what we’re forced to use. Oh, well. Ok, enough whining! So, I create a test class to illustrate the case. The route defined in TestRouter class is responsible for:
1. receiving input
2. setting exchange property to a given xpath, which effectively is the name of the first XML element in the input stream
3. than, the input data is sent to three different external services, each of them replies with some fictional data – notice routes a, b and c. The SimpleContentSetter processor is just for responding with a given text.
4. the response from all three services is somehow processed by RequestEnricher bean, which is described below
5. eventually the exchange is logged in specified category Here is some code for this:

public class SimpleTest {
    public void setUp() throws Exception {
        TestRouter tr = new TestRouter();
        ctx.addRoutes(tr);
    }

    @Test
    public void shouldCheck() throws Exception {
        ctx.createProducerTemplate().send("direct:in", getInOut(""));
    }

    class TestRouter extends RouteBuilder {

        public void configure() throws Exception {

            ((ProcessorType) from("direct:in")
                .setProperty("operation").xpath("local-name(/*)", String.class)
                .multicast(new MergeAggregationStrategy())
                .to("direct:a", "direct:b", "direct:c")
                .end()
                .setBody().simple("${in.body}"))
            .bean(RequestEnricher.class, "enrich")
                .to("log:pl.touk.debug");

            from("direct:a").process(new SimpleContentSetter(""));
            from("direct:b").process(new SimpleContentSetter(""));
            from("direct:c").process(new SimpleContentSetter(""));
        }
    }
}

What’s unusual in this code is the fact, that what normally Camel does when you write a piece of DSL like:

.to("direct:a", "direct:b", "direct:c")

is pass input to service

a, than a‘s output gets passed to b, becomes it’s input, than b‘s output becomes c‘s input. The problem being, you loose the output from a and b, not mentioning that you might want to send the same input to all three services. That’s where a little tool called multicast() comes in handy. It offers you the ability to aggregate the outputs of those services. You may even create an AggregationStrategy that will do it the way you like. Below class, MergeAggregationStrategy does exactly that kind of work – it joins outputs from all three services. A lot of info about proper use of AggregationStrategy-ies can be found in this post by Torsten Mielke.

public class MergeAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange.isFailed()) {
            return oldExchange;
        }
        transformMessage(oldExchange.getIn(), newExchange.getIn());
        transformMessage(oldExchange.getOut(), newExchange.getOut());
        return newExchange;
    }

    private void transformMessage(Message oldM, Message newM) {
        String oldBody = oldM.getBody(String.class);
        String newBody = newM.getBody(String.class);
        newM.setBody(oldBody + newBody);
    }

}

However nice this may look (or not), what you’re left with is a mix of multiple XMLs. Normally this won’t do you much good. Better thing to do is to parse this output in some way. What we’re using for this is a Groovy :). Which is great for the task of parsing XML. A lot less verbose than ordinary Java. Let’s assume a scenario, that the aggregated output, currently looking like this:


is to be processed with the following steps in mind:

  • use ** as the result element
  • use attributes param1, param2, param3 from element ** and add it to result element **
public class RequestEnricher {

    public String enrich(@Property(name = "operation") String operation, Exchange ex) {

        use(DOMCategory) {
            def dhl = new groovy.xml.Namespace("http://example.com/common/dhl/schema", 'dhl')
            def pc = new groovy.xml.Namespace("http://example.com/pc/types", 'pc')
            def doc = new XmlParser().parseText(ex.in.body)

            def pcRequest = doc.
            "aaaa" [0]

            ["param1", "param2", "param3"].each() {
                def node = doc.
                '**' [("" + it)][0]
                if (node)
                    pcRequest['@' + it] = node.text()
            }

            gNodeListToString([pcRequest])
        }

    }

    String gNodeListToString(list) {
        StringBuilder sb = new StringBuilder();
        list.each {
            listItem ->
                StringWriter sw = new StringWriter();
            new XmlNodePrinter(new PrintWriter(sw)).print(listItem)
            sb.append(sw.toString());
        }
        return sb.toString();
    }

}

 

What we’re doing here, especially the last line of

enrich method is the conversion to String. There are some problems for Camel if we spit out Groovy objects. The rest is just some Groovy specific ways of manipulating XML. But looking into enrich method’s parameters, there is @Property annotation used, which binds the property assigned earlier in a router code to one of the arguments. That is really cool feature and there are more such annotations:
* @XPath
* @Header
* @Headers and @Properties – gives whole maps of properties or headers This pretty much concludes the subject :) Have fun, and if in doubt, leave a comment with your question!

You May Also Like

TouK na targach pracyTouK at the job fair

Zapraszamy na XI Targi Pracy i Praktyk dla Elektroników i Informatyków. Odwiedź nasze stoisko w dniach 4-5 marca w godz. 9:30-15:30. Politechnika Warszawska Pierwsze piętro budynku Wydziału Elektroniki i Techniki Informacyjnych Nowowiejska 15/19We invite you to 11th Job and Internship Fair for Electronic Engineers and IT Specialists. Come and visit our stand between 4-5 March 9:30 am and 15 :30 pm Warsaw University of Technology the first floor of the Electronics faculty building Nowowiejska 15/19

Need to make a quick json fixes – JSONPath for rescue

From time to time I have a need to do some fixes in my json data. In a world of flat files I do this with grep/sed/awk tool chain. How to handle it for JSON? Searching for a solution I came across the JSONPath. It quite mature tool (from 2007) but I haven't hear about it so I decided to share my experience with others.

First of all you can try it without pain online: http://jsonpath.curiousconcept.com/. Full syntax is described at http://goessner.net/articles/JsonPath/



But also you can download python binding and run it from command line:
$ sudo apt-get install python-jsonpath-rw
$ sudo apt-get install python-setuptools
$ sudo easy_install -U jsonpath

After that you can use inside python or with simple cli wrapper:
#!/usr/bin/python
import sys, json, jsonpath

path = sys.argv[
1]

result = jsonpath.jsonpath(json.load(sys.stdin), path)
print json.dumps(result, indent=2)

… you can use it in your shell e.g. for json:
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
}
}

You can print only book nodes with price lower than 10 by:
$ jsonpath '$..book[?(@.price 

Result:
[
{
"category": "reference",
"price": 8.95,
"title": "Sayings of the Century",
"author": "Nigel Rees"
},
{
"category": "fiction",
"price": 8.99,
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"author": "Herman Melville"
}
]

Have a nice JSON hacking!From time to time I have a need to do some fixes in my json data. In a world of flat files I do this with grep/sed/awk tool chain. How to handle it for JSON? Searching for a solution I came across the JSONPath. It quite mature tool (from 2007) but I haven't hear about it so I decided to share my experience with others.

Grails session timeout without XML

This article shows clean, non hacky way of configuring featureful event listeners for Grails application servlet context. Feat. HttpSessionListener as a Spring bean example with session timeout depending on whether user account is premium or not.

Common approaches

Speaking of session timeout config in Grails, a default approach is to install templates with a command. This way we got direct access to web.xml file. Also more unnecessary files are created. Despite that unnecessary files are unnecessary, we should also remember some other common knowledge: XML is not for humans.

Another, a bit more hacky, way is to create mysterious scripts/_Events.groovy file. Inside of which, by using not less enigmatic closure: eventWebXmlEnd = { filename -> ... }we can parse and hack into web.xml with a help of XmlSlurper.
Even though lot of Grails plugins do it similar way, still it’s not really straightforward, is it? Besides, where’s the IDE support? Hello!?

Examples of both above ways can be seen on StackOverflow.

Simpler and cleaner way

By adding just a single line to the already generated init closure we have it done:
class BootStrap {

def init = { servletContext ->
servletContext.addListener(OurListenerClass)
}
}

Allrighty, this is enough to avoid XML. Sweets are served after the main course though :)

Listener as a Spring bean

Let us assume we have a requirement. Set a longer session timeout for premium user account.
Users are authenticated upon session creation through SSO.

To easy meet the requirements just instantiate the CustomTimeoutSessionListener as Spring bean at resources.groovy. We also going to need some source of the user custom session timeout. Let say a ConfigService.
beans = {    
customTimeoutSessionListener(CustomTimeoutSessionListener) {
configService = ref('configService')
}
}

With such approach BootStrap.groovy has to by slightly modified. To keep control on listener instantation, instead of passing listener class type, Spring bean is injected by Grails and the instance passed:
class BootStrap {

def customTimeoutSessionListener

def init = { servletContext ->
servletContext.addListener(customTimeoutSessionListener)
}
}

An example CustomTimeoutSessionListener implementation can look like:
import javax.servlet.http.HttpSessionEvent    
import javax.servlet.http.HttpSessionListener
import your.app.ConfigService

class CustomTimeoutSessionListener implements HttpSessionListener {

ConfigService configService

@Override
void sessionCreated(HttpSessionEvent httpSessionEvent) {
httpSessionEvent.session.maxInactiveInterval = configService.sessionTimeoutSeconds
}

@Override
void sessionDestroyed(HttpSessionEvent httpSessionEvent) { /* nothing to implement */ }
}
Having at hand all power of the Spring IoC this is surely a good place to load some persisted user’s account stuff into the session or to notify any other adequate bean about user presence.

Wait, what about the user context?

Honest answer is: that depends on your case. Yet here’s an example of getSessionTimeoutMinutes() implementation using Spring Security:
import org.springframework.security.core.context.SecurityContextHolder    

class ConfigService {

static final int 3H = 3 * 60 * 60
static final int QUARTER = 15 * 60

int getSessionTimeoutSeconds() {

String username = SecurityContextHolder.context?.authentication?.principal
def account = Account.findByUsername(username)

return account?.premium ? 3H : QUARTER
}
}
This example is simplified. Does not contain much of defensive programming. Just an assumption that principal is already set and is a String - unique username. Thanks to Grails convention our ConfigService is transactional so the Account domain class can use GORM dynamic finder.
OK, config fetching implementation details are out of scope here anyway. You can get, load, fetch, obtain from wherever you like to. Domain persistence, principal object, role config, external file and so on...

Any gotchas?

There is one. When running grails test command, servletContext comes as some mocked class instance without addListener method. Thus we going to have a MissingMethodException when running tests :(

Solution is typical:
def init = { servletContext ->
if (Environment.current != Environment.TEST) {
servletContext.addListener(customTimeoutSessionListener)
}
}
An unnecessary obstacle if you ask me. Should I submit a Jira issue about that?

TL;DR

Just implement a HttpSessionListener. Create a Spring bean of the listener. Inject it into BootStrap.groovy and call servletContext.addListener(injectedListener).