Hamming Error Correction with Kotlin – part 1

Hamming code is one of the Computer Science/Telecommunication classics.

In this article, we’ll revisit the topic and implement a stateless Hamming(7,4) encoder using Kotlin.

Hamming Error Correction

Our communication channels and data storages are error-prone – bits can flip due to various things like electric/magnetic interferences, background radiation, or just because of the low quality of materials used.

Since the neutron flux is ~300 higher at around 10km altitude, a particular attention is necessary when dealing with systems operating at high altitudes – the case study of the Cassini-Huygens proves it – in space, a number of reported errors was over four times bigger than on earth, hence the need for efficient error correction.

Richard Hamming‘s Code is one of the solutions to the problem. It’s a perfect code (at least, according to Hamming’s definition) which can expose and correct errors in transmitted messages.

Simply put, it adds metadata to the message (in the form of parity bits) that can be used for validation and correction of errors in messages.

A Brief Explanation

I bet you already wondered what did (7,4) in “Hamming(7,4)” mean.

Simply put, N and M in “Hamming(N, M)” represent the block length and the message size – so, (7,4) means that it encodes four bits into seven bits by adding three additional parity bits – as simple as that.

This particular version can detect and correct single-bit errors, and detect (but not correct) double-bit errors.

In the Hamming’s codeword, parity bits always occupy all indexes that are powers of two (if we use 1-based-indexing).

So, if our initial message is 1111, the codeword will look somewhat like [][]1[]111 – with three parity bits for us to fill in.

If we want to calculate the n-th parity bit, we start on its position in a codeword, we take n elements, skip n elements, take n elements, skip n elements… and so on. If the number of taken ones is odd, we set the parity bit to one, otherwise zero.

In our case:

  • For the first parity bit, we check indexes 1,3,5,7       -> (1)()1()111
  • For the second parity bit, we check indexes 2,3,6,7 -> (1)(1)1()111
  • For the third parity bit, we check indexes 4,5,6,7     -> (1)(1)1(1)111

And that’s all – the codeword is 1111111.

In this case, it might be tempting to think that every sequence containing only ones will be encoded to another sequence comprising only ones… but that’s not the case… but every message containing only zeros will always be encoded to zeros exclusively.

Encoding

First things first, we can leverage Type Driven Development for making our life easier when working with Strings representing raw and encoded messages:

data class EncodedString(val value: String)

data class BinaryString(val value: String)

Using this approach, it’ll be slightly harder to mix them up.

We’ll need a method for calculating the encoded codeword size for a given message. In this case, we simply find the lowest number of parity pairs that can cover the given message:

fun codewordSize(msgLength: Int) = generateSequence(2) { it + 1 }
  .first { r -> msgLength + r + 1 <= (1 shl r) } + msgLength

Next, we’ll need a method for calculating parity and data bits at given indexes for a given message:

fun getParityBit(codeWordIndex: Int, msg: BinaryString) =
  parityIndicesSequence(codeWordIndex, codewordSize(msg.value.length))
    .map { getDataBit(it, msg).toInt() }
    .reduce { a, b -> a xor b }
    .toString()

fun getDataBit(ind: Int, input: BinaryString) = input
  .value[ind - Integer.toBinaryString(ind).length].toString()

Where parityIndicesSequence() is defined as:

fun parityIndicesSequence(start: Int, endEx: Int) = generateSequence(start) { it + 1 }
  .take(endEx - start)
  .filterIndexed { i, _ -> i % ((2 * (start + 1))) < start + 1 }
  .drop(1) // ignore the parity bit

Now, we can put it all together to form the actual solution, which simply is simply going through the whole codeword and filling it with parity bits and actual data:

override fun encode(input: BinaryString): EncodedString {
    fun toHammingCodeValue(it: Int, input: BinaryString) =
      when ((it + 1).isPowerOfTwo()) {
          true -> hammingHelper.getParityBit(it, input)
          false -> hammingHelper.getDataBit(it, input)
      }

    return hammingHelper.getHammingCodewordIndices(input.value.length)
      .map { toHammingCodeValue(it, input) }
      .joinToString("")
      .let(::EncodedString)
}

Note that isPowerOfTwo() is our custom extension function and is not available out-of-the-box in Kotlin:

internal fun Int.isPowerOfTwo() = this != 0 && this and this - 1 == 0

Inlined

The interesting thing is that the whole computation can be inlined to a single Goliath sequence:

override fun encode(input: BinaryString) = generateSequence(0) { it + 1 }
  .take(generateSequence(2) { it + 1 }
    .first { r -> input.value.length + r + 1 <= (1 shl r) } + input.value.length)
  .map {
      when ((it + 1).isPowerOfTwo()) {
          true -> generateSequence(it) { it + 1 }
            .take(generateSequence(2) { it + 1 }
              .first { r -> input.value.length + r + 1 <= (1 shl r) } + input.value.length - it)
            .filterIndexed { i, _ -> i % ((2 * (it + 1))) < it + 1 }
            .drop(1)
            .map {
                input
                  .value[it - Integer.toBinaryString(it).length].toString().toInt()
            }
            .reduce { a, b -> a xor b }
            .toString()
          false -> input
            .value[it - Integer.toBinaryString(it).length].toString()
      }
  }
  .joinToString("")
  .let(::EncodedString)

Not the most readable version, but interesting to have a look.

In Action

We can verify that the implementation works as expected by leveraging JUnit5 and Parameterized Tests:

@ParameterizedTest(name = "{0} should be encoded to {1}")
@CsvSource(
  "1,111",
  "01,10011",
  "11,01111",
  "1001000,00110010000",
  "1100001,10111001001",
  "1101101,11101010101",
  "1101001,01101011001",
  "1101110,01101010110",
  "1100111,01111001111",
  "0100000,10011000000",
  "1100011,11111000011",
  "1101111,10101011111",
  "1100100,11111001100",
  "1100101,00111000101",
  "10011010,011100101010")
fun shouldEncode(first: String, second: String) {
    assertThat(sut.encode(BinaryString(first)))
      .isEqualTo(EncodedString(second))
}

… and by using a home-made property testing:

@Test
@DisplayName("should always encode zeros to zeros")
fun shouldEncodeZeros() {
    generateSequence("0") { it + "0" }
      .take(1000)
      .map { sut.encode(BinaryString(it)).value }
      .forEach {
          assertThat(it).doesNotContain("1")
      }
}

Going Parallel

The most important property of this implementation is statelessness – it could be achieved by making sure that we’re using only pure functions and avoiding shared mutable state – all necessary data is always passed explicitly as input parameters and not held in any form of internal state.

Unfortunately, it results in some repetition and performance overhead that could’ve been avoided if we’re just modifying one mutable list and passing it around… but now we can utilize our resources wiser by parallelizing the whole operation – which should result in a performance improvement.

Without running the code that’s just wishful thinking so let’s do that.

We can parallelize the operation (naively) using Java 8’s parallel streams:

override fun encode(input: BinaryString) = hammingHelper.getHammingCodewordIndices(input.value.length)
  .toList().parallelStream()
  .map { toHammingCodeValue(it, input) }
  .reduce("") { t, u -> t + u }
  .let(::EncodedString)

To not give the sequential implementation an unfair advantage (no toList() conversion so far), we’ll need to change the implementation slightly:

override fun encode(input: BinaryString) = hammingHelper.getHammingCodewordIndices(input.value.length)
  .toList().stream() // to be fair.
  .map { toHammingCodeValue(it, input) }
  .reduce("") { t, u -> t + u }
  .let(::EncodedString)

And now, we can perform some benchmarking using JMH (message.size == 10_000):

Result "com.pivovarit.hamming.benchmarks.SimpleBenchmark.parallel":
 3.690 ±(99.9%) 0.018 ms/op [Average]
 (min, avg, max) = (3.524, 3.690, 3.974), stdev = 0.076
 CI (99.9%): [3.672, 3.708] (assumes normal distribution)

Result "com.pivovarit.hamming.benchmarks.SimpleBenchmark.sequential":
  10.877 ±(99.9%) 0.097 ms/op [Average]
  (min, avg, max) = (10.482, 10.877, 13.498), stdev = 0.410
  CI (99.9%): [10.780, 10.974] (assumes normal distribution)


# Run complete. Total time: 00:15:14

Benchmark                   Mode  Cnt   Score   Error  Units
SimpleBenchmark.parallel    avgt  200   3.690 ± 0.018  ms/op
SimpleBenchmark.sequential  avgt  200  10.877 ± 0.097  ms/op

As we can see, we can notice a major performance improvement in favor of the parallelized implementation – of course; results might drastically change because of various factors so do not think that we’ve found a silver bullet – they do not exist.

For example, here’re the results for encoding a very short message (message.size == 10)):

Benchmark                   Mode Cnt Score   Error Units
SimpleBenchmark.parallel    avgt 200 0.024 ± 0.001 ms/op
SimpleBenchmark.sequential  avgt 200 0.003 ± 0.001 ms/op

In this case, the overhead of splitting the operation among multiple threads makes the parallelized implementation perform eight times slower(sic!).

Here’s the full table for the reference:

Benchmark            (messageSize) Mode Cnt Score   Error    Units
Benchmark.parallel   10            avgt 200 0.022   ± 0.001  ms/op
Benchmark.sequential 10            avgt 200 0.003   ± 0.001  ms/op
 
Benchmark.parallel   100           avgt 200 0.038   ± 0.001  ms/op
Benchmark.sequential 100           avgt 200 0.031   ± 0.001  ms/op

Benchmark.parallel   1000          avgt 200 0.273   ± 0.011  ms/op 
Benchmark.sequential 1000          avgt 200 0.470   ± 0.008  ms/op

Benchmark.parallel   10000         avgt 200 3.731   ± 0.047  ms/op
Benchmark.sequential 10000         avgt 200 12.425  ± 0.336  ms/op

Conclusion

We saw how to implement a thread-safe Hamming(7,4) encoder using Kotlin and what parallelization can potentially give us.

In the second part of the article, we’ll implement a Hamming decoder and see how we can correct single-bit errors and detect double-bit ones.

Code snippets can be found on GitHub.

You May Also Like

Zookeeper + Curator = Distributed sync

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running - required by one of the services, I’ve decided to go that way with my solution.

What is Zookeeper?

Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances - to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes - which are just files that exist as long as connection for them exists. After it disconnects - such file disappears.

With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization.

Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way.

Zookeeper from developer’s POV

With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) - ephemeral and sequence. It is also possible to watch a node for changes.

Using bare Zookeeper is hard!

Creating connections is tedious - and there is lots of things to take care of. Handling an established connection is hard - when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur - these are “recoverable” exceptions, that can be gracefully handled and not break the connection.

    class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span>

And since distributed systems are hard, why would anyone want another difficult to handle tool?

Enter Curator

<p
    class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span>

File read/write

<p
    class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span>

<a href="#"
                                                                                                  name="0"></a>

EnsurePath ensurePath = new EnsurePath(markerPath);
ensurePath.ensure(client.getZookeeperClient());
String json = “...”;
if (client.checkExists().forPath(statusFile(core)) != null)
     client.setData().forPath(statusFile(core), json.getBytes());
else
     client.create().forPath(statusFile(core), json.getBytes());


Distributed locking

Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations.

ref="#"
                                                                                    name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a>



lock = new InterProcessSemaphoreMutex(client, lockPath);
lock.acquire(5, TimeUnit.MINUTES);
… do sth …
lock.release();


 (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java)

Sevice Advertisement

<p

    class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span>

That’s why, with Zookeeper in place, it can be utilised as a registry of existing services.

If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port.

Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration.

Basically this scenario needs ? steps:

<span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0"
                                                                               href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span>



ServiceDiscovery discovery = getDiscovery();
            discovery.start();
            ServiceInstance si = getInstance();
            log.info(si);
            discovery.registerService(si);



2. Another service - on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50):

<a href="#"

                                                                                                  name="3"></a>

instances = discovery.queryForInstances(serviceName);

The whole concept here is ridiculously simple - the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions.

In my example, the structure advertised by services looks like this (+ some getters and constructor - the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java):



public final class WorkerMetadata {
    private final UUID workerId;
    private final String listenAddress;
    private final int listenPort;
}


Source code

<p

    class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0"
                                                                                                    href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo;
usage examples are in my github repo at </span><span class="c5"><a class="c0"
                                                                   href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span>

Conclusion

<p
    class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span>


  1. image comes from: http://www.flickr.com/photos/jfgallery/2993361148
  2. all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground

An application developed for one of my recent projects at TouK involved multiple servers. There was a requirement to ensure failover for the system’s components. Since I had already a few separate components I didn’t want to add more of that, and since there already was a Zookeeper ensemble running - required by one of the services, I’ve decided to go that way with my solution.

What is Zookeeper?

Just a crude distributed synchronization framework. However, it implements Paxos-style algorithms (http://en.wikipedia.org/wiki/Paxos_(computer_science)) to ensure no split-brain scenarios would occur. This is quite an important feature, since I don’t have to care about that kind of problems while using this app. You just need to create an ensemble of a couple of its instances - to ensure high availability. It is basically a virtual filesystem, with files, directories and stuff. One could ask why another filesystem? Well this one is a rather special one, especially for distributed systems. The reason why creating all the locking algorithms on top of Zookeeper is easy is its Ephemeral Nodes - which are just files that exist as long as connection for them exists. After it disconnects - such file disappears.

With such paradigms in place it’s fairly easy to create some high level algorithms for synchronization.

Having that in place, it can safely integrate multiple services ensuring loose coupling in a distributed way.

Zookeeper from developer’s POV

With all the base services for Zookeeper started, it seems there is nothing else, than just connect to it and start implementing necessary algorithms. Unfortunately, the API is quite basic and offers files and directories abstractions with the addition of different node type (file types) - ephemeral and sequence. It is also possible to watch a node for changes.

Using bare Zookeeper is hard!

Creating connections is tedious - and there is lots of things to take care of. Handling an established connection is hard - when establishing connection to ensemble, it’s necessary to negotiate a session also. During the whole process a number of exceptions can occur - these are “recoverable” exceptions, that can be gracefully handled and not break the connection.

    class="c8"><span>So, Zookeeper API is hard.</span></p><p class="c1"><span></span></p><p class="c8"><span>Even if one is proficient with that API, then there come recipes. The reason for using Zookeeper is to be able to implement some more sophisticated algorithms on top of it. Unfortunately those aren&rsquo;t trivial and it is again quite hard to implement them without bugs.</span>

And since distributed systems are hard, why would anyone want another difficult to handle tool?

Enter Curator

<p
    class="c8"><span>Happily, guys from Netflix implemented a nice abstraction for dealing with Zookeeper internals. They called it Curator and use it extensively in the company&rsquo;s environment. Curator offers consistent API for Zookeeper&rsquo;s functionality. It even implements a couple of recipes for distributed systems.</span>

File read/write

<p
    class="c8"><span>The basic use of Zookeeper is as a distributed configuration repository. For this scenario I only need read/write capabilities, to be able to write and read files from the Zookeeper filesystem. This code snippet writes a sample json to a file on ZK filesystem.</span>

<a href="#"
                                                                                                  name="0"></a>

EnsurePath ensurePath = new EnsurePath(markerPath);
ensurePath.ensure(client.getZookeeperClient());
String json = “...”;
if (client.checkExists().forPath(statusFile(core)) != null)
     client.setData().forPath(statusFile(core), json.getBytes());
else
     client.create().forPath(statusFile(core), json.getBytes());


Distributed locking

Having multiple systems there may be a need of using an exclusive lock for some resource, or perhaps some big system requires it’s components to synchronize based on locks. This “recipe” is an ideal match for those situations.

ref="#"
                                                                                    name="b0329bbbf14b79ffaba1139881914aea887ef6a3"></a>



lock = new InterProcessSemaphoreMutex(client, lockPath);
lock.acquire(5, TimeUnit.MINUTES);
… do sth …
lock.release();


 (from https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/LockingRemotely.java)

Sevice Advertisement

<p

    class="c8"><span>This is quite an interesting use case. With many small services on different servers it is not wise to exchange ip addresses and ports between them. When some of those services may go down, while other will try to replace them - the task gets even harder. </span>

That’s why, with Zookeeper in place, it can be utilised as a registry of existing services.

If a service starts, it registers into the ServiceRegistry, offering basic information, like it’s purpose, role, address, and port.

Services that want to use a specific kind of service request an access to some instance. This way of configuring easily decouples services from their configuration.

Basically this scenario needs ? steps:

<span>1. Service starts and registers its presence (</span><span class="c5"><a class="c0"
                                                                               href="https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44">https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerAdvertiser.java#L44</a></span><span>)</span><span>:</span>



ServiceDiscovery discovery = getDiscovery();
            discovery.start();
            ServiceInstance si = getInstance();
            log.info(si);
            discovery.registerService(si);



2. Another service - on another host or in another JVM on the same machine tries to discover who is implementing the service (https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/curator/WorkerFinder.java#L50):

<a href="#"

                                                                                                  name="3"></a>

instances = discovery.queryForInstances(serviceName);

The whole concept here is ridiculously simple - the service advertising its presence just stores a file with its whereabouts. The service that is looking for service providers just look into specific directory and read stored definitions.

In my example, the structure advertised by services looks like this (+ some getters and constructor - the rest is here: https://github.com/zygm0nt/curator-playground/blob/master/src/main/java/pl/touk/model/WorkerMetadata.java):



public final class WorkerMetadata {
    private final UUID workerId;
    private final String listenAddress;
    private final int listenPort;
}


Source code

<p

    class="c8"><span>The above recipes are available in Curator library (</span><span class="c5"><a class="c0"
                                                                                                    href="http://curator.incubator.apache.org/">http://curator.incubator.apache.org/</a></span><span>). Recipes&rsquo;
usage examples are in my github repo at </span><span class="c5"><a class="c0"
                                                                   href="https://github.com/zygm0nt/curator-playground">https://github.com/zygm0nt/curator-playground</a></span>

Conclusion

<p
    class="c8"><span>If you&rsquo;re in need of a reliable platform for exchanging data and managing synchronization, and you need to do it in a distributed fashion - just choose Zookeeper. Then add Curator for the ease of using it. Enjoy!</span>


  1. image comes from: http://www.flickr.com/photos/jfgallery/2993361148
  2. all source code fragments taken from this repo: https://github.com/zygm0nt/curator-playground