Zookeeper + Curator = Distributed sync

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running – required by one of the services, I’ve decided to go that way with my solution. What is Zookeeper? Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances – to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes – which are just files that exist as long as connection for them exists. After it disconnects – such file disappears. With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization. Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way. Zookeeper from developer’s POV With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) – ephemeral and sequence. It is also possible to watch a node for changes. Using bare Zookeeper is hard! Creating connections is tedious – and there is lots of things to take care of. Handling an established connection is hard – when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur – these are “recoverable” exceptions, that can be gracefully handled and not break the connection. class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span> And since distributed systems are hard, why would anyone want another difficult to handle tool? Enter Curator <p class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span> File read/write <p class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span> <a href=”#” name="0"></a> EnsurePath ensurePath = new EnsurePath(markerPath); ensurePath.ensure(client.getZookeeperClient()); String json = “...”; if (client.checkExists().forPath(statusFile(core)) != null) client.setData().forPath(statusFile(core), json.getBytes()); else client.create().forPath(statusFile(core), json.getBytes()); Distributed locking Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations. ref="#" name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a> lock = new InterProcessSemaphoreMutex(client, lockPath); lock.acquire(5, TimeUnit.MINUTES); … do sth … lock.release();  (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java) Sevice Advertisement <p class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span> That’s why, with Zookeeper in place, it can be utilised as a registry of existing services. If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port. Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration. Basically this scenario needs ? steps: <span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span> ServiceDiscovery discovery = getDiscovery(); discovery.start(); ServiceInstance si = getInstance(); log.info(si); discovery.registerService(si); 2. Another service – on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50): <a href=”#” name="3"></a> instances = discovery.queryForInstances(serviceName); The whole concept here is ridiculously simple – the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions. In my example, the structure advertised by services looks like this (+ some getters and constructor – the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java): public final class WorkerMetadata { private final UUID workerId; private final String listenAddress; private final int listenPort; } Source code <p class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0" href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo; usage examples are in my github repo at </span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span> Conclusion <p class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span> image comes from: http://www.flickr.com/photos/jfgallery/2993361148 all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running – required by one of the services, I’ve decided to go that way with my solution. What is Zookeeper? Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances – to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes – which are just files that exist as long as connection for them exists. After it disconnects – such file disappears. With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization. Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way. Zookeeper from developer’s POV With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) – ephemeral and sequence. It is also possible to watch a node for changes. Using bare Zookeeper is hard! Creating connections is tedious – and there is lots of things to take care of. Handling an established connection is hard – when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur – these are “recoverable” exceptions, that can be gracefully handled and not break the connection. class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span> And since distributed systems are hard, why would anyone want another difficult to handle tool? Enter Curator <p class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span> File read/write <p class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span> <a href=”#” name="0"></a> EnsurePath ensurePath = new EnsurePath(markerPath); ensurePath.ensure(client.getZookeeperClient()); String json = “...”; if (client.checkExists().forPath(statusFile(core)) != null) client.setData().forPath(statusFile(core), json.getBytes()); else client.create().forPath(statusFile(core), json.getBytes()); Distributed locking Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations. ref="#" name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a> lock = new InterProcessSemaphoreMutex(client, lockPath); lock.acquire(5, TimeUnit.MINUTES); … do sth … lock.release();  (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java) Sevice Advertisement <p class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span> That’s why, with Zookeeper in place, it can be utilised as a registry of existing services. If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port. Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration. Basically this scenario needs ? steps: <span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span> ServiceDiscovery discovery = getDiscovery(); discovery.start(); ServiceInstance si = getInstance(); log.info(si); discovery.registerService(si); 2. Another service – on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50): <a href=”#” name="3"></a> instances = discovery.queryForInstances(serviceName); The whole concept here is ridiculously simple – the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions. In my example, the structure advertised by services looks like this (+ some getters and constructor – the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java): public final class WorkerMetadata { private final UUID workerId; private final String listenAddress; private final int listenPort; } Source code <p class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0" href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo; usage examples are in my github repo at </span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span> Conclusion <p class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span> image comes from: http://www.flickr.com/photos/jfgallery/2993361148 all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running – required by one of the services, I’ve decided to go that way with my solution.

What is Zookeeper?

Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances – to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes – which are just files that exist as long as connection for them exists. After it disconnects – such file disappears.

With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization.

Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way.

Zookeeper from developer’s POV

With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) – ephemeral and sequence. It is also possible to watch a node for changes.

Using bare Zookeeper is hard!

Creating connections is tedious – and there is lots of things to take care of. Handling an established connection is hard – when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur – these are “recoverable”
exceptions, that can be gracefully handled and not break the connection.

So, Zookeeper API is hard. Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren’t trivial and it is again quite hard to implement them without bugs.

And since distributed systems are hard, why would anyone want another difficult to handle tool?

Enter Curator

Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company’s environment. Curator offers consistent API for Zookeeper’s functionality. It even implements a couple of recipes for distributed systems.

File read/write

The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.

EnsurePath ensurePath = new EnsurePath(markerPath);
ensurePath.ensure(client.getZookeeperClient());
String json = “...”;
if (client.checkExists().forPath(statusFile(core)) != null)
     client.setData().forPath(statusFile(core), json.getBytes());
else
     client.create().forPath(statusFile(core), json.getBytes());

Distributed locking

Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe”
is an ideal match for those situations.

lock = new InterProcessSemaphoreMutex(client, lockPath);
lock.acquire(5, TimeUnit.MINUTES);
… do sth …
lock.release();

 (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java)

Sevice Advertisement

This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them – the task gets even harder.

That’s why, with Zookeeper in place, it can be utilised as a registry of existing services.

If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port.

Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration.

Basically this scenario needs ? steps:

1. Service starts and registers its presence (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44

ServiceDiscovery discovery = getDiscovery();
            discovery.start();
            ServiceInstance si = getInstance();
            log.info(si);
            discovery.registerService(si);

2. Another service – on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50):

instances = discovery.queryForInstances(serviceName);

The whole concept here is ridiculously simple – the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions.

In my example, the structure advertised by services looks like this (+ some getters and constructor – the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java):

public final class WorkerMetadata {
    private final UUID workerId;
    private final String listenAddress;
    private final int listenPort;
}

Source code

The above recipes are available in Curator library (http://curator.incubator.apache.org/
usage examples are in my github repo at https://github.com/zygm0nt/curator-playground

Conclusion

If you’re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion – just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!


  1. image comes from: http://www.flickr.com/photos/jfgallery/2993361148
  2. all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground
You May Also Like

How to automate tests with Groovy 2.0, Spock and Gradle

This is the launch of the 1st blog in my life, so cheers and have a nice reading!

y u no test?

Couple of years ago I wasn't a big fan of unit testing. It was obvious to me that well prepared unit tests are crucial though. I didn't known why exactly crucial yet then. I just felt they are important. My disliking to write automation tests was mostly related to the effort necessary to prepare them. Also a spaghetti code was easily spotted in test sources.

Some goodies at hand

Now I know! Test are crucial to get a better design and a confidence. Confidence to improve without a hesitation. Moreover, now I have the tool to make test automation easy as Sunday morning... I'm talking about the Spock Framework. If you got here probably already know what the Spock is, so I won't introduce it. Enough to say that Spock is an awesome unit testing tool which, thanks to Groovy AST Transformation, simplifies creation of tests greatly.

An obstacle

The point is, since a new major version of Groovy has been released (2.0), there is no matching version of Spock available yet.

What now?

Well, in a matter of fact there is such a version. It's still under development though. It can be obtained from this Maven repository. We can of course use the Maven to build a project and run tests. But why not to go even more "groovy" way? XML is not for humans, is it? Lets use Gradle.

The build file

Update: at the end of the post is updated version of the build file.
apply plugin: 'groovy'
apply plugin: 'idea'

def langLevel = 1.7

sourceCompatibility = langLevel
targetCompatibility = langLevel

group = 'com.tamashumi.example.testwithspock'
version = '0.1'

repositories {
mavenLocal()
mavenCentral()
maven { url 'http://oss.sonatype.org/content/repositories/snapshots/' }
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.1'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0-SNAPSHOT'
}

idea {
project {
jdkName = langLevel
languageLevel = langLevel
}
}
As you can see the build.gradle file is almost self-explanatory. Groovy plugin is applied to compile groovy code. It needs groovy-all.jar - declared in version 2.0 at dependencies block just next to Spock in version 0.7. What's most important, mentioned Maven repository URL is added at repositories block.

Project structure and execution

Gradle's default project directory structure is similar to Maven's one. Unfortunately there is no 'create project' task and you have to create it by hand. It's not a big obstacle though. The structure you will create will more or less look as follows:
<project root>

├── build.gradle
└── src
├── main
│ ├── groovy
└── test
└── groovy
To build a project now you can type command gradle build or gradle test to only run tests.

How about Java?

You can test native Java code with Spock. Just add src/main/java directory and a following line to the build.gradle:
apply plugin: 'java'
This way if you don't want or just can't deploy Groovy compiled stuff into your production JVM for any reason, still whole goodness of testing with Spock and Groovy is at your hand.

A silly-simple example

Just to show that it works, here you go with a basic example.

Java simple example class:

public class SimpleJavaClass {

public int sumAll(int... args) {

int sum = 0;

for (int arg : args){
sum += arg;
}

return sum;
}
}

Groovy simple example class:

class SimpleGroovyClass {

String concatenateAll(char separator, String... args) {

args.join(separator as String)
}
}

The test, uhm... I mean the Specification:

class JustASpecification extends Specification {

@Unroll('Sums integers #integers into: #expectedResult')
def "Can sum different amount of integers"() {

given:
def instance = new SimpleJavaClass()

when:
def result = instance.sumAll(* integers)

then:
result == expectedResult

where:
expectedResult | integers
11 | [3, 3, 5]
8 | [3, 5]
254 | [2, 4, 8, 16, 32, 64, 128]
22 | [7, 5, 6, 2, 2]
}

@Unroll('Concatenates strings #strings with separator "#separator" into: #expectedResult')
def "Can concatenate different amount of integers with a specified separator"() {

given:
def instance = new SimpleGroovyClass()

when:
def result = instance.concatenateAll(separator, * strings)

then:
result == expectedResult

where:
expectedResult | separator | strings
'Whasup dude?' | ' ' as char | ['Whasup', 'dude?']
'2012/09/15' | '/' as char | ['2012', '09', '15']
'nice-to-meet-you' | '-' as char | ['nice', 'to', 'meet', 'you']
}
}
To run tests with Gradle simply execute command gradle test. Test reports can be found at <project root>/build/reports/tests/index.html and look kind a like this.


Please note that, thanks to @Unroll annotation, test is executed once per each parameters row in the 'table' at specification's where: block. This isn't a Java label, but a AST transformation magic.

IDE integration

Gradle's plugin for Iintellij Idea

I've added also Intellij Idea plugin for IDE project generation and some configuration for it (IDE's JDK name). To generate Idea's project files just run command: gradle idea There are available Eclipse and Netbeans plugins too, however I haven't tested them. Idea's one works well.

Intellij Idea's plugins for Gradle

Idea itself has a light Gradle support built-in on its own. To not get confused: Gradle has plugin for Idea and Idea has plugin for Gradle. To get even more 'pluginated', there is also JetGradle plugin within Idea. However I haven't found good reason for it's existence - well, maybe excluding one. It shows dependency tree. There is a bug though - JetGradle work's fine only for lang level 1.6. Strangely all the plugins together do not conflict each other. They even give complementary, quite useful tool set.

Running tests under IDE

Jest to add something sweet this is how Specification looks when run with jUnit  runner under Intellij Idea (right mouse button on JustASpecification class or whole folder of specification extending classes and select "Run ...". You'll see a nice view like this.

Building web application

If you need to build Java web application and bundle it as war archive just add plugin by typing the line
apply plugin: 'war'
in the build.gradle file and create a directory src/main/webapp.

Want to know more?

If you haven't heard about Spock or Gradle before or just curious, check the following links:

What next?

The last thing left is to write the real production code you are about to test. No matter will it be Groovy or Java, I leave this to your need and invention. Of course, you are welcome to post a comments here. I'll answer or even write some more posts about the subject.

Important update

Spock version 0.7 has been released, so the above build file doesn't work anymore. It's easy to fix it though. Just remove last dash and a word SNAPSHOT from Spock dependency declaration. Other important thing is that now spock-core depends on groovy-all-2.0.5, so to avoid dependency conflict groovy dependency should be changed from version 2.0.1 to 2.0.5.
Besides oss.sonata.org snapshots maven repository can be removed. No obstacles any more and the build file now looks as follows:
apply plugin: 'groovy'
apply plugin: 'idea'

def langLevel = 1.7

sourceCompatibility = langLevel
targetCompatibility = langLevel

group = 'com.tamashumi.example.testwithspock'
version = '0.1'

repositories {
mavenLocal()
mavenCentral()
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.5'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
}

idea {
project {
jdkName = langLevel
languageLevel = langLevel
}
}