Hadoop HA setup

With the advent of Hadoop’s 2.x version, there finally is a working
High-Availability solution. Even two of those. Now it really is easy to
configure and use those solutions. It no longer require external
components, like
DRBD.
It all is just neatly packed into Cloudera Hadoop distribution – the
precursor of this solution.

Read on to find out how to use it.

The most important weakness of previous Hadoop releases was the
single-point-of-failure, which happend to be NameNode. NameNode as a key
component of every Hadoop cluster, is responsible for managing
filesystem namespace information and block location. Loosing its data results in loosing all the data
stored on DataNodes. HDFS is no longer able to reach for specific files,
or its blocks. This renders your cluster inoperable.

So it is crucial to be able to detect and counter problems with NameNode.
The most desirable behavior is to have a hot backup, that would ensure
a no-downtime cluster operation. To achieve this, the second NameNode
need to have up-to-date information on filesystem metadata and it needs
to be also up and running. Starting NameNode with existing set of data
may easily take many minutes to parse the actual filesystem state.

Previously used solution – depoying SecondaryNameNode – was somewhat
flawed. It took long time to recover after failure. It was not a
hot-backup solution, which also added to the problem. Some other
solution was required.

So, what needed to be made redundant is the edits dir contents and
sending block location maps from each of the DataNodes to NameNodes –
in case of HA deployment – to both NameNodes. This was accomplished in
two steps. The first one with the release of CDH 4 beta – solution based
on sharing edits directory. Than, with CDH 4.1 came quorum based solution.

Find out how to configure those on your cluster.

Shared edits directory solution

For this kind of setup, there is an assumption, that in a cluster exists
a shared storage directory. It should be deployed using some kind of
network-based filesystem. You could try with NFS or GlusterFS.

<property>
  <name>fs.default.name/name>
  <value>hdfs://example-cluster</value>
</property>
<!-- common server name -->
<property>
  <name>dfs.nameservices</name>
  <value>example-cluster</value>
</property>

<!-- HA configuration -->
<property>
  <name>dfs.ha.namenodes.example-cluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn1</name>
  <value>master1:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn2</name>
  <value>master2:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn1</name>
  <value>0.0.0.0:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn2</name>
  <value>0.0.0.0:50070</value>
</property>

<!-- Storage for edits' files -->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>file:///mnt/filer1/dfs/ha-name-dir-shared</value>
</property>

<!-- Client failover -->
<property>
  <name>dfs.client.failover.proxy.provider.example-cluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- Fencing configuration -->
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>
 <property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/user/.ssh/id_dsa</value>
</property>


<!-- Automatic failover configuration -->
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ha.zookeeper.quorum</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

This setup is quite OK, as long as you’re comfortable with maintaining a
separate service (network storage) for handling the HA state. It seems
error prone to me, because it adds another service which high
availability should be ensured. NFS seems to be a bad choice here,
because AFAIK it does not offer HA out of the box.

On the other hand, we have GlusterFS, which is a distributed filesystem,
you can deploy on multiple bricks and increase the replication level.

Nevertheless, it still brings additional burden of another service to
maintain.

Quorum based solution

With the release of CDH 4.1.0 we are now able to use a much better
integrated solution called JournalNode. Now all the updates are
synchronized through a JournalNode. Each JournalNode have the same data
and all the NameNodes are able to recive filesystem state updates from
that daemons.

This solution is much more consistent with Hadoop ecosystem.

Please note, that the config is almost identical to the one needed for
shared edits directory solution. The only difference is the value for
dfs.namenode.shared.edits.dir. This now points to all the journal
nodes deployed in our cluster.

<property>
  <name>fs.default.name/name>
  <value>hdfs://example-cluster</value>
</property>
<!-- common server name -->
<property>
  <name>dfs.nameservices</name>
  <value>example-cluster</value>
</property>

<!-- HA configuration -->
<property>
  <name>dfs.ha.namenodes.example-cluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn1</name>
  <value>master1:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn2</name>
  <value>master2:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn1</name>
  <value>0.0.0.0:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn2</name>
  <value>0.0.0.0:50070</value>
</property>

<!-- Storage for edits' files -->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://node1:8485;node2:8485;node3:8485/example-cluster</value>
</property>

<!-- Client failover -->
<property>
  <name>dfs.client.failover.proxy.provider.example-cluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- Fencing configuration -->
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>
 <property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/user/.ssh/id_dsa</value>
</property>


<!-- Automatic failover configuration -->
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ha.zookeeper.quorum</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

Infrastructure

In both cases you need to run Zookeeper-based Failover Controller
(hadoop-hdfs-zkfc). This daemon negotiates which NameNode should
become active and which standby.

But that’s not all. Depending on the way you’ve choosen to deploy HA you
need to do some other things:

Shared edits dir

With shared edits dir you need to deploy networked filesystem, and mount
it on your NameNodes. After that you can run your cluster and be happy
with your new HA.

Quroum based

For QJournal to operate you need to install one new package called
hadoop-hdfs-journalnode. This provides startup scripts for Journal
Node daemons. Choose at least three nodes that will be responsible for
handling edits state and deploy journal nodes on them.

Conclusion

Thanks to guys from Cloudera we now can use an enterprise grade High
Availability features for Hadoop. Eliminating the single point of
failure in your cluster is essential for easy maintainability of your
infrastructure.

Given the above choices, I’d suggest using QJournal setup, becasue of
its relatively small impact on the overal cluster architecture. It’s
good performance and fairly simple setup enable the users to easily
start using Hadoop in HA setup.

Are you using Hadoop with HA? What are your impressions?

You May Also Like

Sample for lift-ng: Micro-burn 1.0.0 released

During a last few evenings in my free time I've worked on mini-application called micro-burn. The idea of it appear from work with Agile Jira in our commercial project. This is a great tool for agile projects management. It has inline tasks edition, drag & drop board, reports and many more, but it also have a few drawbacks that turn down our team motivation.

Motivation

From time to time our sprints scope is changing. It is not a big deal because we are trying to be agile :-) but Jira's burndowchart in this situation draw a peek. Because in fact that chart shows scope changes not a real burndown. It means, that chart cannot break down an x-axis if we really do more than we were planned – it always stop on at most zero.

Also for better progress monitoring we've started to split our user stories to technical tasks and estimating them. Original burndowchart doesn't show points from technical tasks. I can find motivation of this – user story almost finished isn't finished at all until user can use it. But in the other hand, if we know which tasks is problematic we can do some teamwork to move it on.

So I realize that it is a good opportunity to try some new approaches and tools.

Tools

I've started with lift framework. In the World of Single Page Applications, this framework has more than simple interface for serving REST services. It comes with awesome Comet support. Comet is a replacement for WebSockets that run on all browsers. It supports long polling and transparent fallback to short polling if limit of client connections exceed. In backend you can handle pushes in CometActor. For further reading take a look at Roundtrip promises

But lift framework is also a kind of framework of frameworks. You can handle own abstraction of CometActors and push to client javascript that shorten up your way from server to client. So it was the trigger for author of lift-ng to make a lift with Angular integration that is build on top of lift. It provides AngularActors from which you can emit/broadcast events to scope of controller. NgModelBinders that synchronize your backend model with client scope in a few lines! I've used them to send project state (all sprints and thier details) to client and notify him about scrum board changes. My actor doing all of this hard work looks pretty small:

Lift-ng also provides factories for creating of Angular services. Services could respond with futures that are transformed to Angular promises in-fly. This is all what was need to serve sprint history:

And on the client side - use of service:


In my opinion this two frameworks gives a huge boost in developing of web applications. You have the power of strongly typing with Scala, you can design your domain on Actors and all of this with simplicity of node.js – lack of json trasforming boilerplate and dynamic application reload.

DDD + Event Sourcing

I've also tried a few fresh approaches to DDD. I've organize domain objects in actors. There are SprintActors with encapsulate sprint aggregate root. Task changes are stored as events which are computed as a difference between two boards states. When it should be provided a history of sprint, next board states are computed from initial state and sequence of events. So I realize that the best way to keep this kind of event sourcing approach tested is to make random tests. This is a test doing random changes at board, calculating events and checking if initial state + events is equals to previously created state:



First look

Screenshot of first version:


If you want to look at this closer, check the source code or download ready to run fatjar on github.During a last few evenings in my free time I've worked on mini-application called micro-burn. The idea of it appear from work with Agile Jira in our commercial project. This is a great tool for agile projects management. It has inline tasks edition, drag & drop board, reports and many more, but it also have a few drawbacks that turn down our team motivation.

Log4j and MDC in Grails

Log4j provides very useful feature: MDC - mapped diagnostic context. It can be used to store data in context of current thread. It may sound scary a bit but idea is simple.

My post is based on post http://burtbeckwith.com/blog/?p=521 from Burt Beckwith's excellent blog, it's definitely worth checking if you are interested in Grails.

Short background story...


Suppose we want to do logging our brand new shopping system and we want to have in each log customer's shopping basket number. And our system can be used at once by many users who can perform many transactions, actions like adding items and so on. How can we achieve that? Of course we can add basket number in every place where we do some logging but this task would be boring and error-prone. 

Instead of this we can use MDC to store variable with basket number in map. 

In fact MDC can be treated as map of custom values for current thread that can be used by logger. 


How to do that with Grails?


Using MDC with Grails is quite simple. All we need to do is to create our own custom filter which works for given urls and puts our data in MDC.

Filters in Grails are classes in directory grails-app/conf/* which names end with *Filters.groovy postfix. We can create this class manually or use Grails command: 
grails create-filters info.rnowak.App.Basket

In result class named BasketFilters will be created in grails-app/conf/info/rnowak/UberApp.

Initially filter class looks a little bit empty:
class BasketFilters {
def filters = {
all(controller:'*', action:'*') {
before = {

}
after = { Map model ->

}
afterView = { Exception e ->

}
}
}
}
All we need to do is fill empty closures, modify filter properties and put some data into MDC.

all is the general name of our filter, as class BasketFilters (plural!) can contain many various filters. You can name it whatever you want, for this post let assume it will be named basketFilter

Another thing is change of filter parameters. According to official documentation (link) we can customize our filter in many ways. You can specify controller to be filtered, its actions, filtered urls and so on. In our example you can stay with default option where filter is applied to every action of every controller. If you are interested in filtering only some urls, use uri parameter with expression describing desired urls to be filtered.

Three closures that are already defined in template have their function and they are started in these conditions:

  • before - as name says, it is executed before filtered action takes place
  • after - similarly, it is called after the action
  • afterView - called after rendering of the actions view
Ok, so now we know what are these mysterious methods and when they are called. But what can be done within them? In official Grails docs (link again) under section 7.6.3 there is a list of properties that are available to use in filter.

With that knowledge, we can proceed to implementing filter.

Putting something into MDC in filter


What we want to do is quite easy: we want to retrieve basket number from parameters and put it into MDC in our filter:
class BasketFilters {
def filters = {
basketFilter(controller:'*', action:'*') {
before = {
MDC.put("basketNumber", params.basketNumber ?: "")
}
after = { Map model ->
MDC.remove("basketNumber")
}
}
}
}

We retrieve basket number from Grails params map and then we put in map under specified key ("basketNumber" in this case), which will be later used in logger conversion pattern. It is important to remove custom value after processing of action to avoid leaks.

So we are putting something into MDC. But how make use of it in logs?


We can refer to custom data in MDC in conversion patter using syntax: %X{key}, where key is our key we used in filter to put data, like:
def conversionPattern = "%d{yyyy-MM-dd HH:mm:ss} %-5p %t [%c{1}] %X{basketNumber} - %m%n"


And that's it :) We've put custom data in log4j MDC and successfully used it in logs to display interesting values.