Zookeeper + Curator = Distributed sync

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running – required by one of the services, I’ve decided to go that way with my solution. What is Zookeeper? Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances – to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes – which are just files that exist as long as connection for them exists. After it disconnects – such file disappears. With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization. Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way. Zookeeper from developer’s POV With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) – ephemeral and sequence. It is also possible to watch a node for changes. Using bare Zookeeper is hard! Creating connections is tedious – and there is lots of things to take care of. Handling an established connection is hard – when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur – these are “recoverable” exceptions, that can be gracefully handled and not break the connection. class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span> And since distributed systems are hard, why would anyone want another difficult to handle tool? Enter Curator <p class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span> File read/write <p class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span> <a href=”#” name="0"></a> EnsurePath ensurePath = new EnsurePath(markerPath); ensurePath.ensure(client.getZookeeperClient()); String json = “...”; if (client.checkExists().forPath(statusFile(core)) != null) client.setData().forPath(statusFile(core), json.getBytes()); else client.create().forPath(statusFile(core), json.getBytes()); Distributed locking Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations. ref="#" name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a> lock = new InterProcessSemaphoreMutex(client, lockPath); lock.acquire(5, TimeUnit.MINUTES); … do sth … lock.release();  (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java) Sevice Advertisement <p class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span> That’s why, with Zookeeper in place, it can be utilised as a registry of existing services. If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port. Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration. Basically this scenario needs ? steps: <span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span> ServiceDiscovery discovery = getDiscovery(); discovery.start(); ServiceInstance si = getInstance(); log.info(si); discovery.registerService(si); 2. Another service – on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50): <a href=”#” name="3"></a> instances = discovery.queryForInstances(serviceName); The whole concept here is ridiculously simple – the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions. In my example, the structure advertised by services looks like this (+ some getters and constructor – the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java): public final class WorkerMetadata { private final UUID workerId; private final String listenAddress; private final int listenPort; } Source code <p class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0" href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo; usage examples are in my github repo at </span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span> Conclusion <p class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span> image comes from: http://www.flickr.com/photos/jfgallery/2993361148 all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running – required by one of the services, I’ve decided to go that way with my solution. What is Zookeeper? Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances – to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes – which are just files that exist as long as connection for them exists. After it disconnects – such file disappears. With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization. Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way. Zookeeper from developer’s POV With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) – ephemeral and sequence. It is also possible to watch a node for changes. Using bare Zookeeper is hard! Creating connections is tedious – and there is lots of things to take care of. Handling an established connection is hard – when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur – these are “recoverable” exceptions, that can be gracefully handled and not break the connection. class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span> And since distributed systems are hard, why would anyone want another difficult to handle tool? Enter Curator <p class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span> File read/write <p class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span> <a href=”#” name="0"></a> EnsurePath ensurePath = new EnsurePath(markerPath); ensurePath.ensure(client.getZookeeperClient()); String json = “...”; if (client.checkExists().forPath(statusFile(core)) != null) client.setData().forPath(statusFile(core), json.getBytes()); else client.create().forPath(statusFile(core), json.getBytes()); Distributed locking Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations. ref="#" name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a> lock = new InterProcessSemaphoreMutex(client, lockPath); lock.acquire(5, TimeUnit.MINUTES); … do sth … lock.release();  (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java) Sevice Advertisement <p class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span> That’s why, with Zookeeper in place, it can be utilised as a registry of existing services. If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port. Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration. Basically this scenario needs ? steps: <span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span> ServiceDiscovery discovery = getDiscovery(); discovery.start(); ServiceInstance si = getInstance(); log.info(si); discovery.registerService(si); 2. Another service – on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50): <a href=”#” name="3"></a> instances = discovery.queryForInstances(serviceName); The whole concept here is ridiculously simple – the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions. In my example, the structure advertised by services looks like this (+ some getters and constructor – the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java): public final class WorkerMetadata { private final UUID workerId; private final String listenAddress; private final int listenPort; } Source code <p class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0" href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo; usage examples are in my github repo at </span><span class="c5"><a class="c0" href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span> Conclusion <p class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span> image comes from: http://www.flickr.com/photos/jfgallery/2993361148 all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running – required by one of the services, I’ve decided to go that way with my solution.

What is Zookeeper?

Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances – to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes – which are just files that exist as long as connection for them exists. After it disconnects – such file disappears.

With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization.

Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way.

Zookeeper from developer’s POV

With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) – ephemeral and sequence. It is also possible to watch a node for changes.

Using bare Zookeeper is hard!

Creating connections is tedious – and there is lots of things to take care of. Handling an established connection is hard – when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur – these are “recoverable”
exceptions, that can be gracefully handled and not break the connection.

So, Zookeeper API is hard. Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren’t trivial and it is again quite hard to implement them without bugs.

And since distributed systems are hard, why would anyone want another difficult to handle tool?

Enter Curator

Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company’s environment. Curator offers consistent API for Zookeeper’s functionality. It even implements a couple of recipes for distributed systems.

File read/write

The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.

EnsurePath ensurePath = new EnsurePath(markerPath);
ensurePath.ensure(client.getZookeeperClient());
String json = “...”;
if (client.checkExists().forPath(statusFile(core)) != null)
     client.setData().forPath(statusFile(core), json.getBytes());
else
     client.create().forPath(statusFile(core), json.getBytes());

Distributed locking

Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe”
is an ideal match for those situations.

lock = new InterProcessSemaphoreMutex(client, lockPath);
lock.acquire(5, TimeUnit.MINUTES);
… do sth …
lock.release();

 (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java)

Sevice Advertisement

This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them – the task gets even harder.

That’s why, with Zookeeper in place, it can be utilised as a registry of existing services.

If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port.

Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration.

Basically this scenario needs ? steps:

1. Service starts and registers its presence (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44

ServiceDiscovery discovery = getDiscovery();
            discovery.start();
            ServiceInstance si = getInstance();
            log.info(si);
            discovery.registerService(si);

2. Another service – on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50):

instances = discovery.queryForInstances(serviceName);

The whole concept here is ridiculously simple – the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions.

In my example, the structure advertised by services looks like this (+ some getters and constructor – the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java):

public final class WorkerMetadata {
    private final UUID workerId;
    private final String listenAddress;
    private final int listenPort;
}

Source code

The above recipes are available in Curator library (http://curator.incubator.apache.org/
usage examples are in my github repo at https://github.com/zygm0nt/curator-playground

Conclusion

If you’re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion – just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!


  1. image comes from: http://www.flickr.com/photos/jfgallery/2993361148
  2. all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground
You May Also Like

Spock basics

Spock (homepage) is like its authors say 'testing and specification framework'. Spock combines very elegant and natural syntax with the powerful capabilities. And what is most important it is easy to use.

One note at the very beginning: I assume that you are already familiar with principles of Test Driven Development and you know how to use testing framework like for example JUnit.

So how can I start?


Writing spock specifications is very easy. We need basic configuration of Spock and Groovy dependencies (if you are using mavenized project with Eclipse look to my previous post: Spock, Java and Maven). Once we have everything set up and running smooth we can write our first specs (spec or specification is equivalent for test class in other frameworks like JUnit of TestNG).

What is great with Spock is fact that we can use it to test both Groovy projects and pure Java projects or even mixed projects.


Let's go!


Every spec class must inherit from spock.lang.Specification class. Only then test runner will recognize it as test class and start tests. We will write few specs for this simple class: User class and few tests not connected with this particular class.

We start with defining our class:
import spock.lang.*

class UserSpec extends Specification {

}
Now we can proceed to defining test fixtures and test methods.

All activites we want to perform before each test method, are to be put in def setup() {...} method and everything we want to be run after each test should be put in def cleanup() {...} method (they are equivalents for JUnit methods with @Before and @After annotations).

It can look like this:
class UserSpec extends Specification {
User user
Document document

def setup() {
user = new User()
document = DocumentTestFactory.createDocumentWithTitle("doc1")
}

def cleanup() {

}
}
Of course we can use field initialization for instantiating test objects:
class UserSpec extends Specification {
User user = new User()
Document document = DocumentTestFactory.createDocumentWithTitle("doc1")

def setup() {

}

def cleanup() {

}
}

What is more readable or preferred? It is just a matter of taste because according to Spock docs behaviour is the same in these two cases.

It is worth mentioning that JUnit @BeforeClass/@AfterClass are also present in Spock as def setupSpec() {...} and def cleanupSpec() {...}. They will be runned before first test and after last test method.


First tests


In Spock every method in specification class, expect setup/cleanup, is treated by runner as a test method (unless you annotate it with @Ignore).

Very interesting feature of Spock and Groovy is ability to name methods with full sentences just like regular strings:
class UserSpec extends Specification {
// ...

def "should assign coment to user"() {
// ...
}
}
With such naming convention we can write real specification and include details about specified behaviour in method name, what is very convenient when reading test reports and analyzing errors.

Test method (also called feature method) is logically divided into few blocks, each with its own purpose. Blocks are defined like labels in Java (but they are transformed with Groovy AST transform features) and some of them must be put in code in specific order.

Most basic and common schema for Spock test is:
class UserSpec extends Specification {
// ...

def "should assign coment to user"() {
given:
// do initialization of test objects
when:
// perform actions to be tested
then:
// collect and analyze results
}
}

But there are more blocks like:
  • setup
  • expect
  • where
  • cleanup
In next section I am going to describe each block shortly with little examples.

given block

This block is used to setup test objects and their state. It has to be first block in test and cannot be repeated. Below is little example how can it be used:
class UserSpec extends Specification {
// ...

def "should add project to user and mark user as project's owner"() {
given:
User user = new User()
Project project = ProjectTestFactory.createProjectWithName("simple project")
// ...
}
}

In this code given block contains initialization of test objects and nothing more. We create simple user without any specified attributes and project with given name. In case when some of these objects could be reused in more feature methods, it could be worth putting initialization in setup method.

when and then blocks

When block contains action we want to test (Spock documentation calls it 'stimulus'). This block always occurs in pair with then block, where we are verifying response for satisfying certain conditions. Assume we have this simple test case:
class UserSpec extends Specification {
// ...

def "should assign user to comment when adding comment to user"() {
given:
User user = new User()
Comment comment = new Comment()
when:
user.addComment(comment)
then:
comment.getUserWhoCreatedComment().equals(user)
}

// ...
}

In when block there is a call of tested method and nothing more. After we are sure our action was performed, we can check for desired conditions in then block.

Then block is very well structured and its every line is treated by Spock as boolean statement. That means, Spock expects that we write instructions containing comparisons and expressions returning true or false, so we can create then block with such statements:
user.getName() == "John"
user.getAge() == 40
!user.isEnabled()
Each of lines will be treated as single assertion and will be evaluated by Spock.

Sometimes we expect that our method throws an exception under given circumstances. We can write test for it with use of thrown method:
class CommentSpec extends Specification {
def "should throw exception when adding null document to comment"() {
given:
Comment comment = new Comment()
when:
comment.setCommentedDocument(null)
then:
thrown(RuntimeException)
}
}

In this test we want to make sure that passing incorrect parameters is correctly handled by tested method and that method throws an exception in response. In case you want to be certain that method does not throw particular exception, simply use notThrown method.


expect block

Expect block is primarily used when we do not want to separate when and then blocks because it is unnatural. It is especially useful for simple test (and according to TDD rules all test should be simple and short) with only one condition to check, like in this example (it is simple but should show the idea):
def "should create user with given name"() {
given:
User user = UserTestFactory.createUser("john doe")
expect:
user.getName() == "john doe"
}



More blocks!


That were very simple tests with standard Spock test layout and canonical divide into given/when/then parts. But Spock offers more possibilities in writing tests and provides more blocks.


setup/cleanup blocks

These two blocks have the very same functionality as the def setup and def cleanup methods in specification. They allow to perform some actions before test and after test. But unlike these methods (which are shared between all tests) blocks work only in methods they are defined in. 


where - easy way to create readable parameterized tests

Very often when we create unit tests there is a need to "feed" them with sample data to test various cases and border values. With Spock this task is very easy and straighforward. To provide test data to feature method, we need to use where block. Let's take a look at little the piece of code:

def "should successfully validate emails with valid syntax"() {
expect:
emailValidator.validate(email) == true
where:
email }

In this example, Spock creates variable called email which is used when calling method being tested. Internally feature method is called once, but framework iterates over given values and calls expect/when block as many times as there are values (however, if we use @Unroll annotation Spock can create separate run for each of given values, more about it in one of next examples).

Now, lets assume that we want our feature method to test both successful and failure validations. To achieve that goal we can create few 
parameterized variables for both input parameter and expected result. Here is a little example:

def "should perform validation of email addresses"() {
expect:
emailValidator.validate(email) == result
where:
email result }
Well, it looks nice, but Spock can do much better. It offers tabular format of defining parameters for test what is much more readable and natural. Lets take a look:
def "should perform validation of email addresses"() {
expect:
emailValidator.validate(email) == result
where:
email | result
"WTF" | false
"@domain" | false
"foo@bar.com" | true
"a@test" | false
}
In this code, each column of our "table" is treated as a separate variable and rows are values for subsequent test iterations.

Another useful feature of Spock during parameterizing test is its ability to "unroll" each parameterized test. Feature method from previous example could be defined as (the body stays the same, so I do not repeat it):
@Unroll("should validate email #email")
def "should perform validation of email addresses"() {
// ...
}
With that annotation, Spock generate few methods each with its own name and run them separately. We can use symbols from where blocks in @Unroll argument by preceding it with '#' sign what is a signal to Spock to use it in generated method name.


What next?


Well, that was just quick and short journey  through Spock and its capabilities. However, with that basic tutorial you are ready to write many unit tests. In one of my future posts I am going to describe more features of Spock focusing especially on its mocking abilities.