Hadoop HA setup

With the advent of Hadoop’s 2.x version, there finally is a working
High-Availability solution. Even two of those. Now it really is easy to
configure and use those solutions. It no longer require external
components, like
DRBD.
It all is just neatly packed into Cloudera Hadoop distribution – the
precursor of this solution.

Read on to find out how to use it.

The most important weakness of previous Hadoop releases was the
single-point-of-failure, which happend to be NameNode. NameNode as a key
component of every Hadoop cluster, is responsible for managing
filesystem namespace information and block location. Loosing its data results in loosing all the data
stored on DataNodes. HDFS is no longer able to reach for specific files,
or its blocks. This renders your cluster inoperable.

So it is crucial to be able to detect and counter problems with NameNode.
The most desirable behavior is to have a hot backup, that would ensure
a no-downtime cluster operation. To achieve this, the second NameNode
need to have up-to-date information on filesystem metadata and it needs
to be also up and running. Starting NameNode with existing set of data
may easily take many minutes to parse the actual filesystem state.

Previously used solution – depoying SecondaryNameNode – was somewhat
flawed. It took long time to recover after failure. It was not a
hot-backup solution, which also added to the problem. Some other
solution was required.

So, what needed to be made redundant is the edits dir contents and
sending block location maps from each of the DataNodes to NameNodes –
in case of HA deployment – to both NameNodes. This was accomplished in
two steps. The first one with the release of CDH 4 beta – solution based
on sharing edits directory. Than, with CDH 4.1 came quorum based solution.

Find out how to configure those on your cluster.

Shared edits directory solution

For this kind of setup, there is an assumption, that in a cluster exists
a shared storage directory. It should be deployed using some kind of
network-based filesystem. You could try with NFS or GlusterFS.

<property>
  <name>fs.default.name/name>
  <value>hdfs://example-cluster</value>
</property>
<!-- common server name -->
<property>
  <name>dfs.nameservices</name>
  <value>example-cluster</value>
</property>

<!-- HA configuration -->
<property>
  <name>dfs.ha.namenodes.example-cluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn1</name>
  <value>master1:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn2</name>
  <value>master2:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn1</name>
  <value>0.0.0.0:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn2</name>
  <value>0.0.0.0:50070</value>
</property>

<!-- Storage for edits' files -->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>file:///mnt/filer1/dfs/ha-name-dir-shared</value>
</property>

<!-- Client failover -->
<property>
  <name>dfs.client.failover.proxy.provider.example-cluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- Fencing configuration -->
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>
 <property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/user/.ssh/id_dsa</value>
</property>


<!-- Automatic failover configuration -->
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ha.zookeeper.quorum</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

This setup is quite OK, as long as you’re comfortable with maintaining a
separate service (network storage) for handling the HA state. It seems
error prone to me, because it adds another service which high
availability should be ensured. NFS seems to be a bad choice here,
because AFAIK it does not offer HA out of the box.

On the other hand, we have GlusterFS, which is a distributed filesystem,
you can deploy on multiple bricks and increase the replication level.

Nevertheless, it still brings additional burden of another service to
maintain.

Quorum based solution

With the release of CDH 4.1.0 we are now able to use a much better
integrated solution called JournalNode. Now all the updates are
synchronized through a JournalNode. Each JournalNode have the same data
and all the NameNodes are able to recive filesystem state updates from
that daemons.

This solution is much more consistent with Hadoop ecosystem.

Please note, that the config is almost identical to the one needed for
shared edits directory solution. The only difference is the value for
dfs.namenode.shared.edits.dir. This now points to all the journal
nodes deployed in our cluster.

<property>
  <name>fs.default.name/name>
  <value>hdfs://example-cluster</value>
</property>
<!-- common server name -->
<property>
  <name>dfs.nameservices</name>
  <value>example-cluster</value>
</property>

<!-- HA configuration -->
<property>
  <name>dfs.ha.namenodes.example-cluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn1</name>
  <value>master1:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn2</name>
  <value>master2:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn1</name>
  <value>0.0.0.0:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn2</name>
  <value>0.0.0.0:50070</value>
</property>

<!-- Storage for edits' files -->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://node1:8485;node2:8485;node3:8485/example-cluster</value>
</property>

<!-- Client failover -->
<property>
  <name>dfs.client.failover.proxy.provider.example-cluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- Fencing configuration -->
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>
 <property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/user/.ssh/id_dsa</value>
</property>


<!-- Automatic failover configuration -->
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ha.zookeeper.quorum</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

Infrastructure

In both cases you need to run Zookeeper-based Failover Controller
(hadoop-hdfs-zkfc). This daemon negotiates which NameNode should
become active and which standby.

But that’s not all. Depending on the way you’ve choosen to deploy HA you
need to do some other things:

Shared edits dir

With shared edits dir you need to deploy networked filesystem, and mount
it on your NameNodes. After that you can run your cluster and be happy
with your new HA.

Quroum based

For QJournal to operate you need to install one new package called
hadoop-hdfs-journalnode. This provides startup scripts for Journal
Node daemons. Choose at least three nodes that will be responsible for
handling edits state and deploy journal nodes on them.

Conclusion

Thanks to guys from Cloudera we now can use an enterprise grade High
Availability features for Hadoop. Eliminating the single point of
failure in your cluster is essential for easy maintainability of your
infrastructure.

Given the above choices, I’d suggest using QJournal setup, becasue of
its relatively small impact on the overal cluster architecture. It’s
good performance and fairly simple setup enable the users to easily
start using Hadoop in HA setup.

Are you using Hadoop with HA? What are your impressions?

You May Also Like

Recently at storm-users

I've been reading through storm-users Google Group recently. This resolution was heavily inspired by Adam Kawa's post "Football zero, Apache Pig hero". Since I've encountered a lot of insightful and very interesting information I've decided to describe some of those in this post.

  • nimbus will work in HA mode - There's a pull request open for it already... but some recent work (distributing topology files via Bittorrent) will greatly simplify the implementation. Once the Bittorrent work is done we'll look at reworking the HA pull request. (storm’s pull request)

  • pig on storm - Pig on Trident would be a cool and welcome project. Join and groupBy have very clear semantics there, as those concepts exist directly in Trident. The extensions needed to Pig are the concept of incremental, persistent state across batches (mirroring those concepts in Trident). You can read a complete proposal.

  • implementing topologies in pure python with petrel looks like this:

class Bolt(storm.BasicBolt):
    def initialize(self, conf, context):
       ''' This method executed only once '''
        storm.log('initializing bolt')

    def process(self, tup):
       ''' This method executed every time a new tuple arrived '''       
       msg = tup.values[0]
       storm.log('Got tuple %s' %msg)

if __name__ == "__main__":
    Bolt().run()
  • Fliptop is happy with storm - see their presentation here

  • topology metrics in 0.9.0: The new metrics feature allows you to collect arbitrarily custom metrics over fixed windows. Those metrics are exported to a metrics stream that you can consume by implementing IMetricsConsumer and configure with Config.java#L473. Use TopologyContext#registerMetric to register new metrics.

  • storm vs flume - some users' point of view: I use Storm and Flume and find that they are better at different things - it really depends on your use case as to which one is better suited. First and foremost, they were originally designed to do different things: Flume is a reliable service for collecting, aggregating, and moving large amounts of data from source to destination (e.g. log data from many web servers to HDFS). Storm is more for real-time computation (e.g. streaming analytics) where you analyse data in flight and don't necessarily land it anywhere. Having said that, Storm is also fault-tolerant and can write to external data stores (e.g. HBase) and you can do real-time computation in Flume (using interceptors)

That's all for this day - however, I'll keep on reading through storm-users, so watch this space for more info on storm development.

I've been reading through storm-users Google Group recently. This resolution was heavily inspired by Adam Kawa's post "Football zero, Apache Pig hero". Since I've encountered a lot of insightful and very interesting information I've decided to describe some of those in this post.

  • nimbus will work in HA mode - There's a pull request open for it already... but some recent work (distributing topology files via Bittorrent) will greatly simplify the implementation. Once the Bittorrent work is done we'll look at reworking the HA pull request. (storm’s pull request)

  • pig on storm - Pig on Trident would be a cool and welcome project. Join and groupBy have very clear semantics there, as those concepts exist directly in Trident. The extensions needed to Pig are the concept of incremental, persistent state across batches (mirroring those concepts in Trident). You can read a complete proposal.

  • implementing topologies in pure python with petrel looks like this:

class Bolt(storm.BasicBolt):
    def initialize(self, conf, context):
       ''' This method executed only once '''
        storm.log('initializing bolt')

    def process(self, tup):
       ''' This method executed every time a new tuple arrived '''       
       msg = tup.values[0]
       storm.log('Got tuple %s' %msg)

if __name__ == "__main__":
    Bolt().run()
  • Fliptop is happy with storm - see their presentation here

  • topology metrics in 0.9.0: The new metrics feature allows you to collect arbitrarily custom metrics over fixed windows. Those metrics are exported to a metrics stream that you can consume by implementing IMetricsConsumer and configure with Config.java#L473. Use TopologyContext#registerMetric to register new metrics.

  • storm vs flume - some users' point of view: I use Storm and Flume and find that they are better at different things - it really depends on your use case as to which one is better suited. First and foremost, they were originally designed to do different things: Flume is a reliable service for collecting, aggregating, and moving large amounts of data from source to destination (e.g. log data from many web servers to HDFS). Storm is more for real-time computation (e.g. streaming analytics) where you analyse data in flight and don't necessarily land it anywhere. Having said that, Storm is also fault-tolerant and can write to external data stores (e.g. HBase) and you can do real-time computation in Flume (using interceptors)

That's all for this day - however, I'll keep on reading through storm-users, so watch this space for more info on storm development.

Apache HISE + Apache Camel

Check out this SlideShare Presentation: Apache HISE + Apache CamelView more presentations from Rafal Rusin.Check out this SlideShare Presentation: Apache HISE + Apache CamelView more presentations from Rafal Rusin.