Distributed scans with HBase

HBase is by design a columnar store, that is optimized for random reads. You just ask for a row using rowId as an identifier and you get your data instantaneously. Performing a scan on part or whole table is a completely different thing. First of all, it is sequential. Meaning it is rather slow, because it doesn’t use all the RegionServers at the same time. It is implemented that way to realize the contract of Scan command – which has to return results sorted by key. So, how to do this efficiently?HBase is by design a columnar store, that is optimized for random reads. You just ask for a row using rowId as an identifier and you get your data instantaneously. Performing a scan on part or whole table is a completely different thing. First of all, it is sequential. Meaning it is rather slow, because it doesn’t use all the RegionServers at the same time. It is implemented that way to realize the contract of Scan command – which has to return results sorted by key. So, how to do this efficiently?

HBase is by design a columnar store, that is optimized for random reads.
You just ask for a row using rowId as an identifier and you get your
data instantaneously.

Performing a scan on part or whole table is a completely different thing.
First of all, it is sequential. Meaning it is rather slow, because it
doesn’t use all the RegionServers at the same time. It is implemented
that way to realize the contract of Scan command – which has to return
results sorted by key.

So, how to do this efficiently?

The usual way of getting data from HBase is with the help of its API,
mainly Scan objects. To accomplish the task I’ll use just them. I’ll
specify startRow and stopRow, so that each Scan request will be looking
through only part of the key space.

It is crucial to note, that this whole solution works because of key
sorting properties in HBase. So, HBase scans a table according to ascending key
values. Since keys are of String type, key with value “1” is smaller
than “2”, because they are sorted lexicographicly. So, also key with value “12345” is smaller than “2”. I’ve
leveraged this property so that I’ve partitioned my whole key space according to
the first character of the key. In my case keys contain only digits. So I
have 10 ranges:

null-1
1-2
2-3
3-4
4-5
5-6
6-7
7-8
8-9
9-null

The speedup comes from the fact, that each range resides in its own
partition. That’s right, I’ve presplit the table to have 10 partitions.
This corresponds rather nicely with my cluster’s setup, because I have
more than 10 RegionServers. So every partition should be on different
RegionServer. It will allow the code to do the requested scan operations
in parallel – giving me this exact performance boost.

How I’ve created the input table:

$ create 'tariff_changes', { NAME => 'cf', SPLITS_FILE => 'splits.txt', VERSIONS => 50, MAX_FILESIZE => 1073741824 }

$ alter 'tariff_changes', { NAME => 'cf', SPLITS_FILE => 'splits.txt', VERSIONS => 50, MAX_FILESIZE => 1073741824 }

Split file is just something along this lines:

1
2
3
4
5
6
7
8
9
0

This tells HBase what are the rowKeys starting and ending each of the
partitions.

Ok, so after this rather lengthy introduction, what the actual code
does? It just spins of a few threads – one for each partition – and runs
a Scan request tailored to that partitions key space. This way, I got a
10x speedup for this particular scan. The execution time went down from
30 minutes to 3 for my use case.

I’ve created an example implementation of this idea. You can find it on
GitHub:
https://github.com/zygm0nt/hbase-distributed-search.

Any ideas on how to speed things up even more with HBase?

You May Also Like

Sygnalizacyjne ABC

Poniższy artykuł oparty jest na wspaniałej pozycji książkowej “System Sygnalizacji nr 7 G. Danielewicz, W.Kabaciński”. Gorąco zachęcam do…

Using WsLite in practice

TL;DR

There is a example working GitHub project which covers unit testing and request/response logging when using WsLite.

Why Groovy WsLite ?

I’m a huge fan of Groovy WsLite project for calling SOAP web services. Yes, in a real world you have to deal with those - big companies have huge amount of “legacy” code and are crazy about homogeneous architecture - only SOAP, Java, Oracle, AIX…

But I also never been comfortable with XFire/CXF approach of web service client code generation. I wrote a bit about other posibilites in this post. With JAXB you can also experience some freaky classloading errors - as Tomek described on his blog. In a large commercial project the “the less code the better” principle is significant. And the code generated from XSD could look kinda ugly - especially more complicated structures like sequences, choices, anys etc.

Using WsLite with native Groovy concepts like XmlSlurper could be a great choice. But since it’s a dynamic approach you have to be really careful - write good unit tests and log requests. Below are my few hints for using WsLite in practice.

Unit testing

Suppose you have some invocation of WsLite SOAPClient (original WsLite example):

def getMothersDay(long _year) {
    def response = client.send(SOAPAction: action) {
       body {
           GetMothersDay('xmlns':'http://www.27seconds.com/Holidays/US/Dates/') {
              year(_year)
           }
       }
    }
    response.GetMothersDayResponse.GetMothersDayResult.text()
}

How can the unit test like? My suggestion is to mock SOAPClient and write a simple helper to test that builded XML is correct. Example using great SpockFramework:

void setup() {
   client = Mock(SOAPClient)
   service.client = client
}

def "should pass year to GetMothersDay and return date"() {
  given:
      def year = 2013
  when:
      def date = service.getMothersDay(year)
  then:
      1 * client.send(_, _) >> { Map params, Closure requestBuilder ->
            Document doc = buildAndParseXml(requestBuilder)
            assertXpathEvaluatesTo("$year", '//ns:GetMothersDay/ns:year', doc)
            return mockResponse(Responses.mothersDay)
      }
      date == "2013-05-12T00:00:00"
}

This uses a real cool feature of Spock - even when you mock the invocation with “any mark” (_), you are able to get actual arguments. So we can build XML that would be passed to SOAPClient's send method and check that specific XPaths are correct:

void setup() {
    engine = XMLUnit.newXpathEngine()
    engine.setNamespaceContext(new SimpleNamespaceContext(namespaces()))
}

protected Document buildAndParseXml(Closure xmlBuilder) {
    def writer = new StringWriter()
    def builder = new MarkupBuilder(writer)
    builder.xml(xmlBuilder)
    return XMLUnit.buildControlDocument(writer.toString())
}

protected void assertXpathEvaluatesTo(String expectedValue,
                                      String xpathExpression, Document doc) throws XpathException {
    Assert.assertEquals(expectedValue,
            engine.evaluate(xpathExpression, doc))
}

protected Map namespaces() {
    return [ns: 'http://www.27seconds.com/Holidays/US/Dates/']
}

The XMLUnit library is used just for XpathEngine, but it is much more powerful for comparing XML documents. The NamespaceContext is needed to use correct prefixes (e.g. ns:GetMothersDay) in your Xpath expressions.

Finally - the mock returns SOAPResponse instance filled with envelope parsed from some constant XML:

protected SOAPResponse mockResponse(String resp) {
    def envelope = new XmlSlurper().parseText(resp)
    new SOAPResponse(envelope: envelope)
}

Request and response logging

The WsLite itself doesn’t use any logging framework. We usually handle it by adding own sendWithLogging method:

private SOAPResponse sendWithLogging(String action, Closure cl) {
    SOAPResponse response = client.send(SOAPAction: action, cl)
    log(response?.httpRequest, response?.httpResponse)
    return response
}

private void log(HTTPRequest request, HTTPResponse response) {
    log.debug("HTTPRequest $request with content:\n${request?.contentAsString}")
    log.debug("HTTPResponse $response with content:\n${response?.contentAsString}")
}

This logs the actual request and response send through SOAPClient. But it logs only when invocation is successful and errors are much more interesting… So here goes withExceptionHandler method:

private SOAPResponse withExceptionHandler(Closure cl) {
    try {
        cl.call()
    } catch (SOAPFaultException soapEx) {
        log(soapEx.httpRequest, soapEx.httpResponse)
        def message = soapEx.hasFault() ? soapEx.fault.text() : soapEx.message
        throw new InfrastructureException(message)
    } catch (HTTPClientException httpEx) {
        log(httpEx.request, httpEx.response)
        throw new InfrastructureException(httpEx.message)
    }
}
def send(String action, Closure cl) {
    withExceptionHandler {
        sendWithLogging(action, cl)
    }
}

XmlSlurper gotchas

Working with XML document with XmlSlurper is generally great fun, but is some cases could introduce some problems. A trivial example is parsing an id with a number to Long value:

def id = Long.valueOf(edit.'@id' as String)

The Attribute class (which edit.'@id' evaluates to) can be converted to String using as operator, but converting to Long requires using valueOf.

The second example is a bit more complicated. Consider following XML fragment:

<edit id="3">
   <params>
      <param value="label1" name="label"/>
      <param value="2" name="param2"/>
   </params>
   <value>123</value>
</edit>
<edit id="6">
   <params>
      <param value="label2" name="label"/>
      <param value="2" name="param2"/>
   </params>
   <value>456</value>
</edit>

We want to find id of edit whose label is label1. The simplest solution seems to be:

def param = doc.edit.params.param.find { it['@value'] == 'label1' }
def edit = params.parent().parent()

But it doesn’t work! The parent method returns multiple edits, not only the one that is parent of given param

Here’s the correct solution:

doc.edit.find { edit ->
    edit.params.param.find { it['@value'] == 'label1' }
}

Example

The example working project covering those hints could be found on GitHub.