Tag: camel

Virtual ESB – application integration made painless with Apache Camel

Inspired by great talk by Stefan Tilkov, we recently debated at TouK about concept of virtual ESB. The idea is to accept the fact that integration of many applications ends with some kind of spaghetti architecture. Even when you provide an ESB layer, which pretends to be a box with clear in and out dependencies, when you look inside it, you will see the same spaghetti as before.

The virtual ESB is then a set of common practices and conventions to integrate systems together, at an application level, even with no bus between.

OK, so you try to convince your application developers to implement some webservice-based integration with another application. And what's their response? 'Why to do that? We have now to generate some classes by our JAXB tools, made complex mappings to our domain classes, handle all that faults, etc. What about debugging, all that messy and unreadable XML messages, you cannot just set a breakpoint and evaluate some expression by our IDE...'

The suggestion of Stefan is to use REST instead of WS-* for integration. It's not always possible in a real world, so our answer is to switch from complex JAX-WS frameworks such as CXF to a lightweight, but powerful integration framework - Apache Camel. To avoid code generation and JAXB mappings and return to pure XML, which have great processing features. To use templating frameworks to generate XML responses. And to write mocks and unit tests, just like you do for your DAO and service layers.

XML is not evil

XML may be too verbose and complicated, but it has great tools to process it - XPath and XQuery, which evaluation in Camel is really easy and straightforward. They are integrated within Camel's expression concept, so whole API is adapted to use XPath. Our most common case is to bind bean method arguments to XPath expressions evaluated on incoming XML messages:
public List findCustomers(@XPath(name="//lastName") String lastName) {
  return getJdbcTemplate().queryForList("select * from customer where last_name = ?", new Object[]{lastName});
    }
Then, you can route your XML message to any bean in Spring context - Camel will evaluate XPath's for you and inject results in method parameters:
from("direct:someXMLService") // XML message on input
     .to("bean:customerService") // String parameters on input, list on output
You can make your own XPath annotation (e.g. adding namespaces support) by subclassing DefaultAnnotationExpressionFactory: (implemented by Maciek Próchniak)
public class XpathExpressionAnnotationFactoryImproved extends DefaultAnnotationExpressionFactory {
    @SuppressWarnings("unchecked")
 @Override
    public Expression createExpression(CamelContext camelContext, Annotation annotation, LanguageAnnotation languageAnnotation, Class expressionReturnType) {
   ... // return an XPathBuilder with injected namespaces
    }
}

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER})
@LanguageAnnotation(language = "xpath", factory = XpathExpressionAnnotationFactoryImproved.class)
public @interface MyXPath {
     String value();
     // You can add the namespaces as the default value of the annotation
     NamespacePrefix[] namespaces() default @NamespacePrefix(prefix = "sch", uri = "http://touk.pl/common/customer/schema");
     Class resultType() default String.class;
}

}

public List findCustomers(@MyXPath(name="//sch:lastName") String lastName) {...}
Another technique (also 'invented' by Maciek) is combining XPath's local-name function with current node (*), which returns the name of root element of XML message. This could result in nice WSDL operation to Java bean method mapping without using any external tools. Consider following example:
Namespaces ns = new Namespaces("sch", "http://touk.pl/common/customer");
    from("direct:someXMLService") // XML message on input
     ... // some normalization - e.g. removing 'request' suffix
     .setProperty("operation").xpath("local-name(/*)", String.class, ns))
     .recipientList().simple("bean:customerService?method=${property.operation}");
So, Camel gets input name of root element from incoming request and sets it as an operation property. Then, by another nice feature of Camel - simple expression language, we are able to route message to particular method in our (Spring) bean. With this technique, we can map findCustomersRequest element in XML to findCustomers method in Java. And we could use XPath to parameter binding to extract parameters of this method from XPath expressions. All this without using JAXB, Axis/CXF, code generation etc. :)

Groovy XML features

Another usable set of tools based on XPath capabilities, it the Groovy language XML builders. You can take a look at Groovy documentation or Groovy in Action book for some self-explaining examples. Imagine a case we had recently in one of our projects: you have to call two services, each returning a list of results (e.g. customers), and then match them to eliminate duplicates (e.g. by matching both first and last name). With JAXB, you will have to traverse first list, build a map of names pointing to customer objects and then add those from second list, that are no already in the map. With Groovy's XMLBuilder you can do a bit simpler, and much more readable:
def doc = parse(...)
  use(DOMCategory) {
    def customersFromSysA = doc[sch.sysAResponse].'*'[sch.customer]
    def customersFromSysB = doc[sch.sysBResponse].'*'[sch.customer]

    def notMatchingCustomers = customersFromSysB.findAll {
       customerB ->
         def matcher = matchingClosure.curry(customerB)
         customersFromSysA.findAll(matcher) as boolean
    }

     notMatchingCustomers.each { customersFromSysA.add(it) }

  }
  ...

  def matchingClosure = { customerB, customerA ->
     customerB.'@firstName' == customerA.'@firstName &&
     customerB.'@lastName' == customerA.'@lastName
  }
(I assume, that you are familiar how to merge few responses in Camel - if not, read this article wrote by Marcin) So, you can use Groovy's findAll method to iterate the customers from system B, leaving only those, which are not in results from system A. And you can use closure to define conditions of customer matching. Finally, you can modify XML messages in Groovy, in our example - by adding unique customers from system B to results from system A.

Templating with Velocity

But how do you generate whole XML response from scratch, without JAXB mapping, which is most commonly used for that? Idea it to use tools traditionally used to generating HTML responses - e.g. templating. Here is example of Velocity template:
<sch:${exchange.properties.operation}Response xmlns:sch="http://touk.pl/common/customer/schema">
    <sch:customers>
    #foreach( $customer in $exchange.properties.customers )
       <sch:customer #if ($customer.firstName) firstName="$customer.firstName" #end
 ...
       >
    #end
    </sch:customers>
</sch:${exchange.properties.operation}Response>

Summary

As you can see, implementing (web)services could be done well and easy, by using the right tools, like Apache Camel. You can embed it in your existing application or use an integration application platform (which is not a synonym for ESB) like Apache Servicemix 4. WebServices are here an implementation detail. You can easily switch to REST if you are huge REST-enthusiast ;)
Komentarze są wyłączone more...

Complex flows with Apache Camel

At work, we're mainly integrating services and systems, and since we're on a constant lookout for new, better technologies, ways to do things easier, make them more sustainable, we're trying to

Usually we use Apache Camel for this task, which is a Swiss-knife for integration engineer. What's more, this tools corresponds well with our approach to integration solutions:

  • try to operate on XML messages, so you get the advantage of XPaths, XSL and other benefits,
  • don't convert XML into Java classes back and forth and be worried with problems like XML conversion,
  • try to get a simple flow of the process.

However, at first sight Apache Camel seems to have some drawbacks mainly in the area of practical solutions ;-). It's very handy tool if you need to use it as a pipeline with some marginal processing of the data that passes through it. It gets a lot harder to wrap your head around if you consider some branching and intermediate calls to external services. This may be tricky to write properly in Camel's DSL.

Here is a simple pipeline example:

And here the exact scenario we're discussing:

What I'd like to show is the solution to this problem. Well, if you're using a recent version of Camel this may be easier, a little different, but should still more-or-less work this way. This code is written for Apache Camel 1.4 - a rather antic version, but that's what we're forced to use. Oh, well.

Ok, enough whining!

So, I create a test class to illustrate the case. The route defined in TestRouter class is responsible for:

  1. receiving input
  2. setting exchange property to a given xpath, which effectively is the name of the first XML element in the input stream
  3. than, the input data is sent to three different external services, each of them replies with some fictional data - notice routes a, b and c. The SimpleContentSetter processor is just for responding with a given text.
  4. the response from all three services is somehow processed by RequestEnricher bean, which is described below
  5. eventually the exchange is logged in specified category

Here is some code for this:

public class SimpleTest {
    public void setUp() throws Exception {
        TestRouter tr = new TestRouter();
        ctx.addRoutes(tr);
    }

    @Test
    public void shouldCheck() throws Exception {
        ctx.createProducerTemplate().send("direct:in", getInOut("<a/>"));
    }


    class TestRouter extends RouteBuilder {

        public void configure() throws Exception {

            ((ProcessorType<ProcessorType>)from("direct:in")
            .setProperty("operation").xpath("local-name(/*)", String.class)
            .multicast(new MergeAggregationStrategy())
                .to("direct:a", "direct:b", "direct:c")
            .end()
            .setBody().simple("${in.body}"))
            .bean(RequestEnricher.class, "enrich")
            .to("log:pl.touk.debug");
            
            from("direct:a").process(new SimpleContentSetter("<aaaa/>"));
            from("direct:b").process(new SimpleContentSetter("<bbbb param1=\"1\" param2=\"2\" param3=\"3\"/>"));
            from("direct:c").process(new SimpleContentSetter("<cccc/>"));
        }
    }
}

What's unusual in this code is the fact, that what normally Camel does when you write a piece of DSL like:

        .to("direct:a", "direct:b", "direct:c")

is pass input to service a, than a's output gets passed to b, becomes it's input, than b's output becomes c's input. The problem being, you loose the output from a and b, not mentioning that you might want to send the same input to all three services.

That's where a little tool called multicast() comes in handy. It offers you the ability to aggregate the outputs of those services. You may even create an AggregationStrategy that will do it the way you like. Below class, MergeAggregationStrategy does exactly that kind of work - it joins outputs from all three services. A lot of info about proper use of AggregationStrategy-ies can be found in this post by Torsten Mielke.

public class MergeAggregationStrategy implements AggregationStrategy {

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange.isFailed()) {
                        return oldExchange;
                }
                transformMessage(oldExchange.getIn(), newExchange.getIn());
                transformMessage(oldExchange.getOut(), newExchange.getOut());
                return newExchange;
        }
        
        private void transformMessage(Message oldM, Message newM) {
                String oldBody = oldM.getBody(String.class);
                String newBody = newM.getBody(String.class);
                newM.setBody(oldBody + newBody);
        }
        
}

However nice this may look (or not), what you're left with is a mix of multiple XMLs. Normally this won't do you much good. Better thing to do is to parse this output in some way. What we're using for this is a Groovy :). Which is great for the task of parsing XML. A lot less verbose than ordinary Java.

Let's assume a scenario, that the aggregated output, currently looking like this:

        
        
        

is to be processed with the following steps in mind:

  • use <aaaa/> as the result element
  • use attributes param1, param2, param3 from element <bbbb/> and add it to result element <aaaa/>
public class RequestEnricher {
        
        public String enrich(@Property(name = "operation") String operation, Exchange ex) {
                
                use(DOMCategory) {
                        def dhl = new groovy.xml.Namespace("http://example.com/common/dhl/schema", 'dhl')
                        def pc = new groovy.xml.Namespace("http://example.com/pc/types", 'pc')
                        def doc = new XmlParser().parseText(ex.in.body)
                        
                        def pcRequest   = doc."aaaa"[0]
                        
                        ["param1", "param2", "param3"].each() {
                                def node = doc.'**'[("" + it)][0]
                                if (node)
                                        pcRequest['@' + it] = node.text()
                        }
                        
                        gNodeListToString([pcRequest])
                }
                
        }
        
        String gNodeListToString(list) {
                StringBuilder sb = new StringBuilder();
                list.each { listItem ->
                        StringWriter sw = new StringWriter();
                        new XmlNodePrinter(new PrintWriter(sw)).print(listItem)
                        sb.append(sw.toString());
                }
                return sb.toString();
        }
        
}

What we're doing here, especially the last line of enrich method is the conversion to String. There are some problems for Camel if we spit out Groovy objects. The rest is just some Groovy specific ways of manipulating XML. But looking into enrich method's parameters, there is @Property annotation used, which binds the property assigned earlier in a router code to one of the arguments. That is really cool feature and there are more such annotations:

  • @XPath
  • @Header
  • @Headers and @Properties - gives whole maps of properties or headers

This pretty much concludes the subject :) Have fun, and if in doubt, leave a comment with your question!

Komentarze są wyłączone more...

Event processing in camel-drools

In a previous post about camel-drools I've introduced camel-drools component and implemented some simple task-oriented process using rules inside Camel route. Today I'll show how to extend this example by adding event processing.

So, how to describe an event? Each event occur at some time and lasts for some duration, events happen in some particular order. We have then a 'cloud of events' from which we want to identify those, which form some interesting correlations. And here the usage of Drools becomes reasonable - we don't have to react for each event, just describe set of rules and consequences for those interesting correlations. Drools engine will find them and fire matching rules.

Suppose our system has to monitor execution of task assigned to users. After a task is created, user has 10 days to complete it. When he doesn't - an e-mail remainder should be sent.

Rule definition may look like this:
import org.apache.camel.component.drools.stateful.model.*
global org.apache.camel.component.drools.CamelDroolsHelper helper

declare TaskCreated
    @role( event )
    @expires( 365d )
end

declare TaskCompleted
    @role( event )
    @expires( 365d )
end

rule "Task not completed after 10 days"
    when
       $t : TaskCreated()
       not(TaskCompleted(name==$t.name, this after [-*, 10d] $t))
    then
       helper.send("direct:escalation", $t.getName());
end
As you can see, there are two types of events: TaskCreated - when system assigns task to users, and TaskCompleted - when user finishes task. We correlate those two by the 'name' property.
Firstly, we need to declare our model classes as events by adding @role(event) and @expires annotations. Then we describe rule: 'when there are is no TaskCompleted event after 10 days of TaskCreated event, send task name to direct:escalation route'.

Again, this could be example of declarative programming - we don't event have to specify actual names of tasks, just correlate TaskCreated with TaskCompleted events by name. In this example, I used 'after' temporal operator. For description of others - see Drools Fusion documentation.

And finally, here is JUnit test code snippet:
public class TaskEventsTest extends GenericTest {

    DefaultCamelContext ctx;

    @Test
    public void testCompleted() throws Exception {
        insertAdvanceDays(new TaskCreated("Task1"), 4);
        assertContains(0);
        insertAdvanceDays(new TaskCompleted("Task1"), 4);
        advanceDays(5);
        assertContains(0);
    }

    @Test
    public void testNotCompleted() throws Exception {
        insertAdvanceDays(new TaskCreated("Task1"), 5);
        assertContains(0);
        advanceDays(5);
        assertContains("Task1");
    }

    @Test
    public void testOneNotCompleted() throws Exception {
        ksession.insert(new TaskCreated("Task1"));
        insertAdvanceDays(new TaskCreated("Task2"), 5);
        assertContains(0);
        insertAdvanceDays(new TaskCompleted("Task1"), 4);
        assertContains(0);
        advanceDays(1);
        assertContains("Task2");
        advanceDays(10);
        assertContains("Task2");
    }

    @Override
    protected void setUpResources(KnowledgeBuilder kbuilder) throws Exception {
        kbuilder.add(new ReaderResource(new StringReader(
                IOUtils.toString(getClass()
                 .getResourceAsStream("/stateful/task-event.drl")))),
                 ResourceType.DRL);
    }

    @Override
    public void setUpInternal() throws Exception {
        this.ctx = new DefaultCamelContext();
        CamelDroolsHelper helper = new CamelDroolsHelper(ctx,
                new DefaultExchange(ctx)) {
            public Object send(String uri, Object body) {
                sentStuff.add(body.toString());
                return null;
            };
        };
        ksession.setGlobal("helper", helper);
    }
}

You can find source code for this example here.
Komentarze są wyłączone more...

Introducing camel-drools component

Introduction

In this post I’ll try to introduce Apache Camel component for Drools library – a great an widely used Business Rules Management System. When we decided to use Drools 5 inside Servicemix for some big project, it turned out that there is no production-ready solution that will meet out requirements. The servicemix-drools component is lacking several very important features, eg:

  • StatefulSession database persistence for long-running processes,
  • support for Complex Event Processing (event-based rules),
  • Apache Camel based deployment to ease rules consequence processing,
  • Support for Drools unit testing framework.

To satisfy those requirements, Maciek Próchniak created a set of utility classes, which helped us run Drools inside Camel route. Starting from this codebase, we did some refactoring, add few new features (eg. pluggable object persistance) and released camel-drools component on TouK Open Source Projects forge.

Example

To summarize key features and show how to use camel-drools component, let’s try to implement an example taken from Drools Flow documentation:
Drools Flow example
There is kind-of ‘process’ where first Task1 and Task2 are created and can be executed in parallel. Task3 needs to be executed after completion of both Task1 and Task2.

Implementation

The class Task have 2 fields, a name and completed flag, we also need an id for session serialization:

public class Task implements Serializable {
    private static final long serialVersionUID = -2964477958089238715L;
    private String name;
    private boolean completed;

    public Task(String name) {
        this(name, false);
    }

    public Task(String name, boolean completed) {
        this.name = name;
        this.completed = completed;
    }

    public String getName() {
        return name;
    }

    public boolean isCompleted() {
        return completed;
    }

    public long getId() {
        return name.hashCode();
    }
}

We also define another class representing the state of process, needed to fire rules in correct order.

Using that model, we now can implement our ruleset, defined in task.drl file:

import org.apache.camel.component.drools.stateful.*

global org.apache.camel.component.drools.CamelDroolsHelper helper

rule "init"
salience 100
    when
        $s : State(name=="start")
    then
        insert(new Task("Task1"));
        insert(new Task("Task2"));
        retract($s);
end

rule "all tasks completed"
    when
        not(exists Task(completed==false))
        not(exists State(name=="end"))
    then
        insert(new Task("Task3"));
end

rule "Task3 completed"
salience 30
    when
        Task(name=="Task3", completed==true)
    then
        insert(new State("end"));
        helper.send("direct:completed", "completed");
end

In first rule – „init” we insert two tasks and then retract state object from the session to avoid recursive execution of that rule. Rule „all tasks completed” shows the power of Drools – we just declare that this rule is fired when „there are no incompleted tasks” and don’t have to specify which tasks. So this shows rather ‘declarative’ than ‘imperative’ way of development – we have much more expressiveness than just step-by-step actions which lead to some situation.
The CamelDroolsHelper is a wrapper for ProducerTemplate and can be used to send some message trough another Camel route as consequence of a rule.

But how are Tasks mark as completed in Drools session? The idea is to expose session through Camel endpoint to allow insert or update objects, which are passed as body of exchanges:

public class TaskRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:drools")
            .setHeader("drools.key", constant(new MultiKey(new String[]{"process-1"})))
            .to("drools:task.drl?stateful=true");
        from("direct:completed").to("log:test");
    }
}

The Drools endpoint is described by "drools:task.drl?stateful=true" URI. It loads definition of rules from task.drl file and runs endpoint in stateful mode (described next paragraph). When object is passed to this endpoint, it is inserted (or updated) to session and fireAllRules() method is called.
Another important thing is „drools.key” header – it is used to distinguish sessions between „processes”. E.g. when we have some customer-oriented rules, we want to group facts and events in session per customer – by some customer id. When the „drools.key” is set to that id, sessions for different customers could be found and saved separately.

Stateful session persistence

Camel-drools component can be used in two modes: stateful and stateless. The main difference between those is session persistence – only in stateful mode session is stored in database. So long duration event rules are correctly handled only in this mode – and this is what we used in this example. Let’s look at Spring context definition:

<beans>
  <import resource="classpath:camel-drools-context.xml"></import>
  <util:list id="keyColumns">
    <value>task_id</value>
  </util:list>
  <bean id="sessionDAO" class="org.apache.camel.component.drools.dao.JdbcSessionDAO" p:dataSource-ref="dataSource" p:sessionTable="sessioninfo" p:objectTable="objectinfo" p:sequenceName="cameldrools_seq" p:discriminatorValue="task" p:keyColumns-ref="keyColumns" p:marshallingStrategyFactory-ref="marshallingStrategyFactory">
  </bean>
</beans>

As you can see, there are some requirements for database objects to handle session persistence correctly – two tables: one for KnowlegdeStatefulSession and one for objects (facts and events) persistence. You can name them freely, just provide those names to sessionTable and objectTable properties of sessionDAO. A sequence for id generation is also needed.

Route and rules testing

Here is example test for TaskRouteBuilder:

@SuppressWarnings("unchecked")
public class TaskRouteBuilderTest extends TaskRouteBuilder {

    DefaultCamelContext ctx;
    ProducerTemplate tpl;
    MockSessionDAO dao;

    @Before
    public void makeContext() throws Exception {
        ctx = new DefaultCamelContext();
        ctx.addComponent("drools", new DroolsComponent(ctx));
        ApplicationContext appCtx = new ClassPathXmlApplicationContext(
                new String[] { "camel-drools-context.xml", "mock-dao-context.xml" });
        dao = (MockSessionDAO) appCtx.getBean("sessionDAO");
        ctx.setRegistry(new ApplicationContextRegistry(appCtx));
        ctx.addRoutes(this);
        ctx.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:completed").to("mock:test");
            }
        });
        ctx.start();
        tpl = ctx.createProducerTemplate();
    }

    @Test
    public void testUpdate() throws Exception {
        Endpoint endpoint = ctx.getEndpoint("direct:drools");
        tpl.requestBody(endpoint, new State("start"));
        SessionWithIdentifier session = dao.getSession();
        Assert.assertEquals(2, session.getSession().getFactHandles().size());
        tpl.requestBody(endpoint, new Task("Task1", true));
        tpl.requestBody(endpoint, new Task("Task2", true));
        Assert.assertEquals(3, session.getSession().getFactHandles().size());
        tpl.requestBody(endpoint, new Task("Task3", true));

        MockEndpoint mock = MockEndpoint.resolve(ctx, "mock:test");
        mock.expectedMessageCount(1);
        mock.setResultWaitTime(5000L);
        mock.assertIsSatisfied();
    }
}

In setup method some required initialization is done – camel-drools-context.xml file is loaded and MockSessionDao created.
The test first starts process by passing State object with „start” name to Drools session through Camel route. This should add Task1 and Task2 to session – and it’s tested by counting the factHadles in session.
Next, Task1 and Task2 are updated by making them completed, which should result in Task3 present in session – another factHandle. Last step is to complete Task3 and check that last rule is executed by assertions on MockEndpoint.

You can download source code for this example and whole component from here – this is branch for Camel 1.x version, which we use in our project.


  • About Us

    We create information and telecommunication technologies for large and medium-sized enterprises. They are based on recognized, primarily open standards and technologies, which are to guarantee our customers the highest quality and stability of information systems development. At the same time building our competencies, we are trying to shape such standards and support the development of open technologies.
    Copyright © 2002-2011 TouK sp. z o.o. s.k.a.
    iDream theme by Templates Next | Powered by WordPress