Complex flows with Apache Camel

At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use Apache Camel for this task, which is a Swis…At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use Apache Camel for this task, which is a Swis…

At work, we’re mainly integrating services and systems, and since we’re on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we’re trying to Usually we use

Apache Camel for this task, which is a Swiss-knife for integration engineer. What’s more, this tools corresponds well with our approach to integration solutions:
* try to operate on XML messages, so you get the advantage of XPaths, XSL and other benefits,
* don’t convert XML into Java classes back and forth and be worried with problems like XML conversion,
* try to get a simple flow of the process. However, at first sight Apache Camel seems to have some drawbacks mainly in the area of practical solutions ;-). It’s very handy tool if you need to use it as a pipeline with some marginal processing of the data that passes through it. It gets a lot harder to wrap your head around if you consider some branching and intermediate calls to external services. This may be tricky to write properly in Camel’s DSL. Here is a simple pipeline example:

And here the exact scenario we’re discussing: What I’d like to show is the solution to this problem. Well, if you’re using a recent version of Camel this may be easier, a little different, but should still more-or-less work this way. This code is written for Apache Camel 1.4 – a rather antic version, but that’s what we’re forced to use. Oh, well. Ok, enough whining! So, I create a test class to illustrate the case. The route defined in TestRouter class is responsible for:
1. receiving input
2. setting exchange property to a given xpath, which effectively is the name of the first XML element in the input stream
3. than, the input data is sent to three different external services, each of them replies with some fictional data – notice routes a, b and c. The SimpleContentSetter processor is just for responding with a given text.
4. the response from all three services is somehow processed by RequestEnricher bean, which is described below
5. eventually the exchange is logged in specified category Here is some code for this:

public class SimpleTest {
    public void setUp() throws Exception {
        TestRouter tr = new TestRouter();
        ctx.addRoutes(tr);
    }

    @Test
    public void shouldCheck() throws Exception {
        ctx.createProducerTemplate().send("direct:in", getInOut(""));
    }

    class TestRouter extends RouteBuilder {

        public void configure() throws Exception {

            ((ProcessorType) from("direct:in")
                .setProperty("operation").xpath("local-name(/*)", String.class)
                .multicast(new MergeAggregationStrategy())
                .to("direct:a", "direct:b", "direct:c")
                .end()
                .setBody().simple("${in.body}"))
            .bean(RequestEnricher.class, "enrich")
                .to("log:pl.touk.debug");

            from("direct:a").process(new SimpleContentSetter(""));
            from("direct:b").process(new SimpleContentSetter(""));
            from("direct:c").process(new SimpleContentSetter(""));
        }
    }
}

What’s unusual in this code is the fact, that what normally Camel does when you write a piece of DSL like:

.to("direct:a", "direct:b", "direct:c")

is pass input to service

a, than a‘s output gets passed to b, becomes it’s input, than b‘s output becomes c‘s input. The problem being, you loose the output from a and b, not mentioning that you might want to send the same input to all three services. That’s where a little tool called multicast() comes in handy. It offers you the ability to aggregate the outputs of those services. You may even create an AggregationStrategy that will do it the way you like. Below class, MergeAggregationStrategy does exactly that kind of work – it joins outputs from all three services. A lot of info about proper use of AggregationStrategy-ies can be found in this post by Torsten Mielke.

public class MergeAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange.isFailed()) {
            return oldExchange;
        }
        transformMessage(oldExchange.getIn(), newExchange.getIn());
        transformMessage(oldExchange.getOut(), newExchange.getOut());
        return newExchange;
    }

    private void transformMessage(Message oldM, Message newM) {
        String oldBody = oldM.getBody(String.class);
        String newBody = newM.getBody(String.class);
        newM.setBody(oldBody + newBody);
    }

}

However nice this may look (or not), what you’re left with is a mix of multiple XMLs. Normally this won’t do you much good. Better thing to do is to parse this output in some way. What we’re using for this is a Groovy :). Which is great for the task of parsing XML. A lot less verbose than ordinary Java. Let’s assume a scenario, that the aggregated output, currently looking like this:


is to be processed with the following steps in mind:

  • use ** as the result element
  • use attributes param1, param2, param3 from element ** and add it to result element **
public class RequestEnricher {

    public String enrich(@Property(name = "operation") String operation, Exchange ex) {

        use(DOMCategory) {
            def dhl = new groovy.xml.Namespace("http://example.com/common/dhl/schema", 'dhl')
            def pc = new groovy.xml.Namespace("http://example.com/pc/types", 'pc')
            def doc = new XmlParser().parseText(ex.in.body)

            def pcRequest = doc.
            "aaaa" [0]

            ["param1", "param2", "param3"].each() {
                def node = doc.
                '**' [("" + it)][0]
                if (node)
                    pcRequest['@' + it] = node.text()
            }

            gNodeListToString([pcRequest])
        }

    }

    String gNodeListToString(list) {
        StringBuilder sb = new StringBuilder();
        list.each {
            listItem ->
                StringWriter sw = new StringWriter();
            new XmlNodePrinter(new PrintWriter(sw)).print(listItem)
            sb.append(sw.toString());
        }
        return sb.toString();
    }

}

 

What we’re doing here, especially the last line of

enrich method is the conversion to String. There are some problems for Camel if we spit out Groovy objects. The rest is just some Groovy specific ways of manipulating XML. But looking into enrich method’s parameters, there is @Property annotation used, which binds the property assigned earlier in a router code to one of the arguments. That is really cool feature and there are more such annotations:
* @XPath
* @Header
* @Headers and @Properties – gives whole maps of properties or headers This pretty much concludes the subject :) Have fun, and if in doubt, leave a comment with your question!

You May Also Like

How to use mocks in controller tests

Even since I started to write tests for my Grails application I couldn't find many articles on using mocks. Everyone is talking about tests and TDD but if you search for it there isn't many articles.

Today I want to share with you a test with mocks for a simple and complete scenario. I have a simple application that can fetch Twitter tweets and present it to user. I use REST service and I use GET to fetch tweets by id like this: http://api.twitter.com/1/statuses/show/236024636775735296.json. You can copy and paste it into your browser to see a result.

My application uses Grails 2.1 with spock-0.6 for tests. I have TwitterReaderService that fetches tweets by id, then I parse a response into my Tweet class.


class TwitterReaderService {
Tweet readTweet(String id) throws TwitterError {
try {
String jsonBody = callTwitter(id)
Tweet parsedTweet = parseBody(jsonBody)
return parsedTweet
} catch (Throwable t) {
throw new TwitterError(t)
}
}

private String callTwitter(String id) {
// TODO: implementation
}

private Tweet parseBody(String jsonBody) {
// TODO: implementation
}
}

class Tweet {
String id
String userId
String username
String text
Date createdAt
}

class TwitterError extends RuntimeException {}

TwitterController plays main part here. Users call show action along with id of a tweet. This action is my subject under test. I've implemented some basic functionality. It's easier to focus on it while writing tests.


class TwitterController {
def twitterReaderService

def index() {
}

def show() {
Tweet tweet = twitterReaderService.readTweet(params.id)
if (tweet == null) {
flash.message = 'Tweet not found'
redirect(action: 'index')
return
}

[tweet: tweet]
}
}

Let's start writing a test from scratch. Most important thing here is that I use mock for my TwitterReaderService. I do not construct new TwitterReaderService(), because in this test I test only TwitterController. I am not interested in injected service. I know how this service is supposed to work and I am not interested in internals. So before every test I inject a twitterReaderServiceMock into controller:


import grails.test.mixin.TestFor
import spock.lang.Specification

@TestFor(TwitterController)
class TwitterControllerSpec extends Specification {
TwitterReaderService twitterReaderServiceMock = Mock(TwitterReaderService)

def setup() {
controller.twitterReaderService = twitterReaderServiceMock
}
}

Now it's time to think what scenarios I need to test. This line from TwitterReaderService is the most important:


Tweet readTweet(String id) throws TwitterError

You must think of this method like a black box right now. You know nothing of internals from controller's point of view. You're only interested what can be returned for you:

  • a TwitterError can be thrown
  • null can be returned
  • Tweet instance can be returned

This list is your test blueprint. Now answer a simple question for each element: "What do I want my controller to do in this situation?" and you have plan test:

  • show action should redirect to index if TwitterError is thrown and inform about error
  • show action should redirect to index and inform if tweet is not found
  • show action should show found tweet

That was easy and straightforward! And now is the best part: we use twitterReaderServiceMock to mock each of these three scenarios!

In Spock there is a good documentation about interaction with mocks. You declare what methods are called, how many times, what parameters are given and what should be returned. Remember a black box? Mock is your black box with detailed instruction, e.g.: I expect you that if receive exactly one call to readTweet with parameter '1' then you should throw me a TwitterError. Rephrase this sentence out loud and look at this:


1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }

This is a valid interaction definition on mock! It's that easy! Here is a complete test that fails for now:


import grails.test.mixin.TestFor
import spock.lang.Specification

@TestFor(TwitterController)
class TwitterControllerSpec extends Specification {
TwitterReaderService twitterReaderServiceMock = Mock(TwitterReaderService)

def setup() {
controller.twitterReaderService = twitterReaderServiceMock
}

def "show should redirect to index if TwitterError is thrown"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }
0 * _._
flash.message == 'There was an error on fetching your tweet'
response.redirectUrl == '/twitter/index'
}
}

| Failure: show should redirect to index if TwitterError is thrown(pl.refaktor.twitter.TwitterControllerSpec)
| pl.refaktor.twitter.TwitterError
at pl.refaktor.twitter.TwitterControllerSpec.show should redirect to index if TwitterError is thrown_closure1(TwitterControllerSpec.groovy:29)

You may notice 0 * _._ notation. It says: I don't want any other mocks or any other methods called. Fail this test if something is called! It's a good practice to ensure that there are no more interactions than you want.

Ok, now I need to implement controller logic to handle TwitterError.


class TwitterController {

def twitterReaderService

def index() {
}

def show() {
Tweet tweet

try {
tweet = twitterReaderService.readTweet(params.id)
} catch (TwitterError e) {
log.error(e)
flash.message = 'There was an error on fetching your tweet'
redirect(action: 'index')
return
}

[tweet: tweet]
}
}

My tests passes! We have two scenarios left. Rule stays the same: TwitterReaderService returns something and we test against it. So this line is the heart of each test, change only returned values after >>:


1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }

Here is a complete test for three scenarios and controller that passes it.


import grails.test.mixin.TestFor
import spock.lang.Specification

@TestFor(TwitterController)
class TwitterControllerSpec extends Specification {

TwitterReaderService twitterReaderServiceMock = Mock(TwitterReaderService)

def setup() {
controller.twitterReaderService = twitterReaderServiceMock
}

def "show should redirect to index if TwitterError is thrown"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }
0 * _._
flash.message == 'There was an error on fetching your tweet'
response.redirectUrl == '/twitter/index'
}

def "show should inform about not found tweet"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> null
0 * _._
flash.message == 'Tweet not found'
response.redirectUrl == '/twitter/index'
}


def "show should show found tweet"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> new Tweet()
0 * _._
flash.message == null
response.status == 200
}
}

class TwitterController {

def twitterReaderService

def index() {
}

def show() {
Tweet tweet

try {
tweet = twitterReaderService.readTweet(params.id)
} catch (TwitterError e) {
log.error(e)
flash.message = 'There was an error on fetching your tweet'
redirect(action: 'index')
return
}

if (tweet == null) {
flash.message = 'Tweet not found'
redirect(action: 'index')
return
}

[tweet: tweet]
}
}

The most important thing here is that we've tested controller-service interaction without logic implementation in service! That's why mock technique is so useful. It decouples your dependencies and let you focus on exactly one subject under test. Happy testing!