Hadoop HA setup

With the advent of Hadoop’s 2.x version, there finally is a working
High-Availability solution. Even two of those. Now it really is easy to
configure and use those solutions. It no longer require external
components, like
DRBD.
It all is just neatly packed into Cloudera Hadoop distribution – the
precursor of this solution.

Read on to find out how to use it.

The most important weakness of previous Hadoop releases was the
single-point-of-failure, which happend to be NameNode. NameNode as a key
component of every Hadoop cluster, is responsible for managing
filesystem namespace information and block location. Loosing its data results in loosing all the data
stored on DataNodes. HDFS is no longer able to reach for specific files,
or its blocks. This renders your cluster inoperable.

So it is crucial to be able to detect and counter problems with NameNode.
The most desirable behavior is to have a hot backup, that would ensure
a no-downtime cluster operation. To achieve this, the second NameNode
need to have up-to-date information on filesystem metadata and it needs
to be also up and running. Starting NameNode with existing set of data
may easily take many minutes to parse the actual filesystem state.

Previously used solution – depoying SecondaryNameNode – was somewhat
flawed. It took long time to recover after failure. It was not a
hot-backup solution, which also added to the problem. Some other
solution was required.

So, what needed to be made redundant is the edits dir contents and
sending block location maps from each of the DataNodes to NameNodes –
in case of HA deployment – to both NameNodes. This was accomplished in
two steps. The first one with the release of CDH 4 beta – solution based
on sharing edits directory. Than, with CDH 4.1 came quorum based solution.

Find out how to configure those on your cluster.

Shared edits directory solution

For this kind of setup, there is an assumption, that in a cluster exists
a shared storage directory. It should be deployed using some kind of
network-based filesystem. You could try with NFS or GlusterFS.

<property>
  <name>fs.default.name/name>
  <value>hdfs://example-cluster</value>
</property>
<!-- common server name -->
<property>
  <name>dfs.nameservices</name>
  <value>example-cluster</value>
</property>

<!-- HA configuration -->
<property>
  <name>dfs.ha.namenodes.example-cluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn1</name>
  <value>master1:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn2</name>
  <value>master2:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn1</name>
  <value>0.0.0.0:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn2</name>
  <value>0.0.0.0:50070</value>
</property>

<!-- Storage for edits' files -->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>file:///mnt/filer1/dfs/ha-name-dir-shared</value>
</property>

<!-- Client failover -->
<property>
  <name>dfs.client.failover.proxy.provider.example-cluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- Fencing configuration -->
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>
 <property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/user/.ssh/id_dsa</value>
</property>


<!-- Automatic failover configuration -->
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ha.zookeeper.quorum</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

This setup is quite OK, as long as you’re comfortable with maintaining a
separate service (network storage) for handling the HA state. It seems
error prone to me, because it adds another service which high
availability should be ensured. NFS seems to be a bad choice here,
because AFAIK it does not offer HA out of the box.

On the other hand, we have GlusterFS, which is a distributed filesystem,
you can deploy on multiple bricks and increase the replication level.

Nevertheless, it still brings additional burden of another service to
maintain.

Quorum based solution

With the release of CDH 4.1.0 we are now able to use a much better
integrated solution called JournalNode. Now all the updates are
synchronized through a JournalNode. Each JournalNode have the same data
and all the NameNodes are able to recive filesystem state updates from
that daemons.

This solution is much more consistent with Hadoop ecosystem.

Please note, that the config is almost identical to the one needed for
shared edits directory solution. The only difference is the value for
dfs.namenode.shared.edits.dir. This now points to all the journal
nodes deployed in our cluster.

<property>
  <name>fs.default.name/name>
  <value>hdfs://example-cluster</value>
</property>
<!-- common server name -->
<property>
  <name>dfs.nameservices</name>
  <value>example-cluster</value>
</property>

<!-- HA configuration -->
<property>
  <name>dfs.ha.namenodes.example-cluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn1</name>
  <value>master1:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.example-cluster.nn2</name>
  <value>master2:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn1</name>
  <value>0.0.0.0:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.example-cluster.nn2</name>
  <value>0.0.0.0:50070</value>
</property>

<!-- Storage for edits' files -->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://node1:8485;node2:8485;node3:8485/example-cluster</value>
</property>

<!-- Client failover -->
<property>
  <name>dfs.client.failover.proxy.provider.example-cluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- Fencing configuration -->
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
</property>
 <property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/user/.ssh/id_dsa</value>
</property>


<!-- Automatic failover configuration -->
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<property>
  <name>ha.zookeeper.quorum</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

Infrastructure

In both cases you need to run Zookeeper-based Failover Controller
(hadoop-hdfs-zkfc). This daemon negotiates which NameNode should
become active and which standby.

But that’s not all. Depending on the way you’ve choosen to deploy HA you
need to do some other things:

Shared edits dir

With shared edits dir you need to deploy networked filesystem, and mount
it on your NameNodes. After that you can run your cluster and be happy
with your new HA.

Quroum based

For QJournal to operate you need to install one new package called
hadoop-hdfs-journalnode. This provides startup scripts for Journal
Node daemons. Choose at least three nodes that will be responsible for
handling edits state and deploy journal nodes on them.

Conclusion

Thanks to guys from Cloudera we now can use an enterprise grade High
Availability features for Hadoop. Eliminating the single point of
failure in your cluster is essential for easy maintainability of your
infrastructure.

Given the above choices, I’d suggest using QJournal setup, becasue of
its relatively small impact on the overal cluster architecture. It’s
good performance and fairly simple setup enable the users to easily
start using Hadoop in HA setup.

Are you using Hadoop with HA? What are your impressions?

You May Also Like

Grails session timeout without XML

This article shows clean, non hacky way of configuring featureful event listeners for Grails application servlet context. Feat. HttpSessionListener as a Spring bean example with session timeout depending on whether user account is premium or not.

Common approaches

Speaking of session timeout config in Grails, a default approach is to install templates with a command. This way we got direct access to web.xml file. Also more unnecessary files are created. Despite that unnecessary files are unnecessary, we should also remember some other common knowledge: XML is not for humans.

Another, a bit more hacky, way is to create mysterious scripts/_Events.groovy file. Inside of which, by using not less enigmatic closure: eventWebXmlEnd = { filename -> ... }we can parse and hack into web.xml with a help of XmlSlurper.
Even though lot of Grails plugins do it similar way, still it’s not really straightforward, is it? Besides, where’s the IDE support? Hello!?

Examples of both above ways can be seen on StackOverflow.

Simpler and cleaner way

By adding just a single line to the already generated init closure we have it done:
class BootStrap {

def init = { servletContext ->
servletContext.addListener(OurListenerClass)
}
}

Allrighty, this is enough to avoid XML. Sweets are served after the main course though :)

Listener as a Spring bean

Let us assume we have a requirement. Set a longer session timeout for premium user account.
Users are authenticated upon session creation through SSO.

To easy meet the requirements just instantiate the CustomTimeoutSessionListener as Spring bean at resources.groovy. We also going to need some source of the user custom session timeout. Let say a ConfigService.
beans = {    
customTimeoutSessionListener(CustomTimeoutSessionListener) {
configService = ref('configService')
}
}

With such approach BootStrap.groovy has to by slightly modified. To keep control on listener instantation, instead of passing listener class type, Spring bean is injected by Grails and the instance passed:
class BootStrap {

def customTimeoutSessionListener

def init = { servletContext ->
servletContext.addListener(customTimeoutSessionListener)
}
}

An example CustomTimeoutSessionListener implementation can look like:
import javax.servlet.http.HttpSessionEvent    
import javax.servlet.http.HttpSessionListener
import your.app.ConfigService

class CustomTimeoutSessionListener implements HttpSessionListener {

ConfigService configService

@Override
void sessionCreated(HttpSessionEvent httpSessionEvent) {
httpSessionEvent.session.maxInactiveInterval = configService.sessionTimeoutSeconds
}

@Override
void sessionDestroyed(HttpSessionEvent httpSessionEvent) { /* nothing to implement */ }
}
Having at hand all power of the Spring IoC this is surely a good place to load some persisted user’s account stuff into the session or to notify any other adequate bean about user presence.

Wait, what about the user context?

Honest answer is: that depends on your case. Yet here’s an example of getSessionTimeoutMinutes() implementation using Spring Security:
import org.springframework.security.core.context.SecurityContextHolder    

class ConfigService {

static final int 3H = 3 * 60 * 60
static final int QUARTER = 15 * 60

int getSessionTimeoutSeconds() {

String username = SecurityContextHolder.context?.authentication?.principal
def account = Account.findByUsername(username)

return account?.premium ? 3H : QUARTER
}
}
This example is simplified. Does not contain much of defensive programming. Just an assumption that principal is already set and is a String - unique username. Thanks to Grails convention our ConfigService is transactional so the Account domain class can use GORM dynamic finder.
OK, config fetching implementation details are out of scope here anyway. You can get, load, fetch, obtain from wherever you like to. Domain persistence, principal object, role config, external file and so on...

Any gotchas?

There is one. When running grails test command, servletContext comes as some mocked class instance without addListener method. Thus we going to have a MissingMethodException when running tests :(

Solution is typical:
def init = { servletContext ->
if (Environment.current != Environment.TEST) {
servletContext.addListener(customTimeoutSessionListener)
}
}
An unnecessary obstacle if you ask me. Should I submit a Jira issue about that?

TL;DR

Just implement a HttpSessionListener. Create a Spring bean of the listener. Inject it into BootStrap.groovy and call servletContext.addListener(injectedListener).