Introducing camel-drools component

Introduction In this post I’ll try to introduce

Apache Camel component for Drools library – a great an widely used Business Rules Management System. When we decided to use Drools 5 inside Servicemix for some big project, it turned out that there is no production-ready solution that will meet out requirements. The servicemix-drools component is lacking several very important features, eg:
* StatefulSession database persistence for long-running processes,
* support for Complex Event Processing (event-based rules),
* Apache Camel based deployment to ease rules consequence processing,
* Support for Drools unit testing framework. To satisfy those requirements, Maciek Próchniak created a set of utility classes, which helped us run Drools inside Camel route. Starting from this codebase, we did some refactoring, add few new features (eg. pluggable object persistance) and released camel-drools component on TouK Open Source Projects forge.

Example To summarize key features and show how to use camel-drools component, let’s try to implement an example taken from Drools Flow documentation:

There is kind-of ‘process’ where first Task1 and Task2 are created and can be executed in parallel. Task3 needs to be executed after completion of both Task1 and Task2.

Implementation The class Task have 2 fields, a

name and completed flag, we also need an id for session serialization:

public class Task implements Serializable {
    private static final long serialVersionUID = -2964477958089238715L;    
    private String name;
    private boolean completed;

    public Task(String name) {
        this(name, false);
    }

    public Task(String name, boolean completed) {
        this.name = name;
        this.completed = completed;
    }

    public String getName() {
        return name;
    }

    public boolean isCompleted() {
        return completed;
    }

    public long getId() {
        return name.hashCode();
    }
}

We also define another class representing the state of process, needed to fire rules in correct order. Using that model, we now can implement our ruleset, defined in

task.drl file:

import org.apache.camel.component.drools.stateful.*

global org.apache.camel.component.drools.CamelDroolsHelper helper

rule "init"
salience 100
    when
        $s : State(name=="start")
    then
        insert(new Task("Task1"));
        insert(new Task("Task2"));
        retract($s);
end

rule "all tasks completed"
    when
        not(exists Task(completed==false))
        not(exists State(name=="end"))
    then
        insert(new Task("Task3"));
end

rule "Task3 completed"
salience 30
    when 
        Task(name=="Task3", completed==true)
    then
        insert(new State("end"));
        helper.send("direct:completed", "completed");
end

In first rule – “init” we insert two tasks and then retract state object from the session to avoid recursive execution of that rule. Rule “all tasks completed” shows the power of Drools – we just declare that this rule is fired when “there are no incompleted tasks” and don’t have to specify which tasks. So this shows rather ‘declarative’ than ‘imperative’ way of development – we have much more expressiveness than just step-by-step actions which lead to some situation. The

CamelDroolsHelper is a wrapper for ProducerTemplate and can be used to send some message trough another Camel route as consequence of a rule. But how are Tasks mark as completed in Drools session? The idea is to expose session through Camel endpoint to allow insert or update objects, which are passed as body of exchanges:

public class TaskRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:drools")
            .setHeader("drools.key", constant(new MultiKey(new String[] {
                "process-1"
            })))
            .to("drools:task.drl?stateful=true");
        from("direct:completed").to("log:test");
    }
}

The Drools endpoint is described by

"drools:task.drl?stateful=true" URI. It loads definition of rules from task.drl file and runs endpoint in stateful mode (described next paragraph). When object is passed to this endpoint, it is inserted (or updated) to session and fireAllRules() method is called. Another important thing is “drools.key” header – it is used to distinguish sessions between “processes”. E.g. when we have some customer-oriented rules, we want to group facts and events in session per customer – by some customer id. When the “drools.key” is set to that id, sessions for different customers could be found and saved separately.

Stateful session persistence Camel-drools component can be used in two modes:

stateful *and *stateless. The main difference between those is session persistence – only in stateful mode session is stored in database. So long duration event rules are correctly handled only in this mode – and this is what we used in this example. Let’s look at Spring context definition:

task_id

As you can see, there are some requirements for database objects to handle session persistence correctly – two tables: one for KnowlegdeStatefulSession and one for objects (facts and events) persistence. You can name them freely, just provide those names to

sessionTable and objectTable properties of sessionDAO. A sequence for id generation is also needed.

Route and rules testing Here is example test for TaskRouteBuilder:

@SuppressWarnings("unchecked")
public class TaskRouteBuilderTest extends TaskRouteBuilder {

    DefaultCamelContext ctx;
    ProducerTemplate tpl;
    MockSessionDAO dao;

    @Before
    public void makeContext() throws Exception {
        ctx = new DefaultCamelContext();
        ctx.addComponent("drools", new DroolsComponent(ctx));
        ApplicationContext appCtx = new ClassPathXmlApplicationContext(
            new String[] {
                "camel-drools-context.xml",
                "mock-dao-context.xml"
            });
        dao = (MockSessionDAO) appCtx.getBean("sessionDAO");
        ctx.setRegistry(new ApplicationContextRegistry(appCtx));
        ctx.addRoutes(this);
        ctx.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:completed").to("mock:test");
            }
        });
        ctx.start();
        tpl = ctx.createProducerTemplate();
    }

    @Test
    public void testUpdate() throws Exception {
        Endpoint endpoint = ctx.getEndpoint("direct:drools");
        tpl.requestBody(endpoint, new State("start"));
        SessionWithIdentifier session = dao.getSession();
        Assert.assertEquals(2, session.getSession().getFactHandles().size());
        tpl.requestBody(endpoint, new Task("Task1", true));
        tpl.requestBody(endpoint, new Task("Task2", true));
        Assert.assertEquals(3, session.getSession().getFactHandles().size());
        tpl.requestBody(endpoint, new Task("Task3", true));

        MockEndpoint mock = MockEndpoint.resolve(ctx, "mock:test");
        mock.expectedMessageCount(1);
        mock.setResultWaitTime(5000 L);
        mock.assertIsSatisfied();
    }
}

In setup method some required initialization is done – camel-drools-context.xml file is loaded and MockSessionDao created. The test first starts process by passing State object with “start” name to Drools session through Camel route. This should add Task1 and Task2 to session – and it’s tested by counting the factHadles in session. Next, Task1 and Task2 are updated by making them completed, which should result in Task3 present in session – another factHandle. Last step is to complete Task3 and check that last rule is executed by assertions on MockEndpoint. You can download source code for this example and whole component from

here – this is branch for Camel 1.x version, which we use in our project.

You May Also Like

Zookeeper + Curator = Distributed sync

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running - required by one of the services, I’ve decided to go that way with my solution.

What is Zookeeper?

Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances - to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes - which are just files that exist as long as connection for them exists. After it disconnects - such file disappears.

With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization.

Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way.

Zookeeper from developer’s POV

With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) - ephemeral and sequence. It is also possible to watch a node for changes.

Using bare Zookeeper is hard!

Creating connections is tedious - and there is lots of things to take care of. Handling an established connection is hard - when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur - these are “recoverable” exceptions, that can be gracefully handled and not break the connection.

    class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span>

And since distributed systems are hard, why would anyone want another difficult to handle tool?

Enter Curator

<p
    class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span>

File read/write

<p
    class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span>

<a href="#"
                                                                                                  name="0"></a>

EnsurePath ensurePath = new EnsurePath(markerPath);
ensurePath.ensure(client.getZookeeperClient());
String json = “...”;
if (client.checkExists().forPath(statusFile(core)) != null)
     client.setData().forPath(statusFile(core), json.getBytes());
else
     client.create().forPath(statusFile(core), json.getBytes());


Distributed locking

Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations.

ref="#"
                                                                                    name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a>



lock = new InterProcessSemaphoreMutex(client, lockPath);
lock.acquire(5, TimeUnit.MINUTES);
… do sth …
lock.release();


 (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java)

Sevice Advertisement

<p

    class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span>

That’s why, with Zookeeper in place, it can be utilised as a registry of existing services.

If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port.

Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration.

Basically this scenario needs ? steps:

<span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0"
                                                                               href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span>



ServiceDiscovery discovery = getDiscovery();
            discovery.start();
            ServiceInstance si = getInstance();
            log.info(si);
            discovery.registerService(si);



2. Another service - on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50):

<a href="#"

                                                                                                  name="3"></a>

instances = discovery.queryForInstances(serviceName);

The whole concept here is ridiculously simple - the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions.

In my example, the structure advertised by services looks like this (+ some getters and constructor - the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java):



public final class WorkerMetadata {
    private final UUID workerId;
    private final String listenAddress;
    private final int listenPort;
}


Source code

<p

    class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0"
                                                                                                    href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo;
usage examples are in my github repo at </span><span class="c5"><a class="c0"
                                                                   href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span>

Conclusion

<p
    class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span>


  1. image comes from: http://www.flickr.com/photos/jfgallery/2993361148
  2. all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running - required by one of the services, I’ve decided to go that way with my solution.

What is Zookeeper?

Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances - to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes - which are just files that exist as long as connection for them exists. After it disconnects - such file disappears.

With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization.

Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way.

Zookeeper from developer’s POV

With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) - ephemeral and sequence. It is also possible to watch a node for changes.

Using bare Zookeeper is hard!

Creating connections is tedious - and there is lots of things to take care of. Handling an established connection is hard - when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur - these are “recoverable” exceptions, that can be gracefully handled and not break the connection.

    class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span>

And since distributed systems are hard, why would anyone want another difficult to handle tool?

Enter Curator

<p
    class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span>

File read/write

<p
    class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span>

<a href="#"
                                                                                                  name="0"></a>

EnsurePath ensurePath = new EnsurePath(markerPath);
ensurePath.ensure(client.getZookeeperClient());
String json = “...”;
if (client.checkExists().forPath(statusFile(core)) != null)
     client.setData().forPath(statusFile(core), json.getBytes());
else
     client.create().forPath(statusFile(core), json.getBytes());


Distributed locking

Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations.

ref="#"
                                                                                    name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a>



lock = new InterProcessSemaphoreMutex(client, lockPath);
lock.acquire(5, TimeUnit.MINUTES);
… do sth …
lock.release();


 (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java)

Sevice Advertisement

<p

    class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span>

That’s why, with Zookeeper in place, it can be utilised as a registry of existing services.

If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port.

Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration.

Basically this scenario needs ? steps:

<span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0"
                                                                               href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span>



ServiceDiscovery discovery = getDiscovery();
            discovery.start();
            ServiceInstance si = getInstance();
            log.info(si);
            discovery.registerService(si);



2. Another service - on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50):

<a href="#"

                                                                                                  name="3"></a>

instances = discovery.queryForInstances(serviceName);

The whole concept here is ridiculously simple - the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions.

In my example, the structure advertised by services looks like this (+ some getters and constructor - the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java):



public final class WorkerMetadata {
    private final UUID workerId;
    private final String listenAddress;
    private final int listenPort;
}


Source code

<p

    class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0"
                                                                                                    href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo;
usage examples are in my github repo at </span><span class="c5"><a class="c0"
                                                                   href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span>

Conclusion

<p
    class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span>


  1. image comes from: http://www.flickr.com/photos/jfgallery/2993361148
  2. all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground