Distributed scans with HBase

HBase is by design a columnar store, that is optimized for random reads. You just ask for a row using rowId as an identifier and you get your data instantaneously. Performing a scan on part or whole table is a completely different thing. First of all, it is sequential. Meaning it is rather slow, because it doesn’t use all the RegionServers at the same time. It is implemented that way to realize the contract of Scan command – which has to return results sorted by key. So, how to do this efficiently?HBase is by design a columnar store, that is optimized for random reads. You just ask for a row using rowId as an identifier and you get your data instantaneously. Performing a scan on part or whole table is a completely different thing. First of all, it is sequential. Meaning it is rather slow, because it doesn’t use all the RegionServers at the same time. It is implemented that way to realize the contract of Scan command – which has to return results sorted by key. So, how to do this efficiently?

HBase is by design a columnar store, that is optimized for random reads.
You just ask for a row using rowId as an identifier and you get your
data instantaneously.

Performing a scan on part or whole table is a completely different thing.
First of all, it is sequential. Meaning it is rather slow, because it
doesn’t use all the RegionServers at the same time. It is implemented
that way to realize the contract of Scan command – which has to return
results sorted by key.

So, how to do this efficiently?

The usual way of getting data from HBase is with the help of its API,
mainly Scan objects. To accomplish the task I’ll use just them. I’ll
specify startRow and stopRow, so that each Scan request will be looking
through only part of the key space.

It is crucial to note, that this whole solution works because of key
sorting properties in HBase. So, HBase scans a table according to ascending key
values. Since keys are of String type, key with value “1” is smaller
than “2”, because they are sorted lexicographicly. So, also key with value “12345” is smaller than “2”. I’ve
leveraged this property so that I’ve partitioned my whole key space according to
the first character of the key. In my case keys contain only digits. So I
have 10 ranges:

null-1
1-2
2-3
3-4
4-5
5-6
6-7
7-8
8-9
9-null

The speedup comes from the fact, that each range resides in its own
partition. That’s right, I’ve presplit the table to have 10 partitions.
This corresponds rather nicely with my cluster’s setup, because I have
more than 10 RegionServers. So every partition should be on different
RegionServer. It will allow the code to do the requested scan operations
in parallel – giving me this exact performance boost.

How I’ve created the input table:

$ create 'tariff_changes', { NAME => 'cf', SPLITS_FILE => 'splits.txt', VERSIONS => 50, MAX_FILESIZE => 1073741824 }

$ alter 'tariff_changes', { NAME => 'cf', SPLITS_FILE => 'splits.txt', VERSIONS => 50, MAX_FILESIZE => 1073741824 }

Split file is just something along this lines:

1
2
3
4
5
6
7
8
9
0

This tells HBase what are the rowKeys starting and ending each of the
partitions.

Ok, so after this rather lengthy introduction, what the actual code
does? It just spins of a few threads – one for each partition – and runs
a Scan request tailored to that partitions key space. This way, I got a
10x speedup for this particular scan. The execution time went down from
30 minutes to 3 for my use case.

I’ve created an example implementation of this idea. You can find it on
GitHub:
https://github.com/zygm0nt/hbase-distributed-search.

Any ideas on how to speed things up even more with HBase?

You May Also Like

Atom Feeds with Spring MVC

How to add feeds (Atom) to your web application with just two classes?
How about Spring MVC?

Here are my assumptions:
  • you are using Spring framework
  • you have some entity, say “News”, that you want to publish in your feeds
  • your "News" entity has creationDate, title, and shortDescription
  • you have some repository/dao, say "NewsRepository", that will return the news from your database
  • you want to write as little as possible
  • you don't want to format Atom (xml) by hand
You actually do NOT need to use Spring MVC in your application already. If you do, skip to step 3.


Step 1: add Spring MVC dependency to your application
With maven that will be:
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>3.1.0.RELEASE</version>
</dependency>

Step 2: add Spring MVC DispatcherServlet
With web.xml that would be:
<servlet>
    <servlet-name>dispatcher</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
    <servlet-name>dispatcher</servlet-name>
    <url-pattern>/feed</url-pattern>
</servlet-mapping>
Notice, I set the url-pattern to “/feed” which means I don't want Spring MVC to handle any other urls in my app (I'm using a different web framework for the rest of the app). I also give it a brand new contextConfigLocation, where only the mvc configuration is kept.

Remember that, when you add a DispatcherServlet to an app that already has Spring (from ContextLoaderListener for example), your context is inherited from the global one, so you should not create beans that exist there again, or include xml that defines them. Watch out for Spring context getting up twice, and refer to spring or servlet documentation to understand what's happaning.

Step 3. add ROME – a library to handle Atom format
With maven that is:
<dependency>
    <groupId>net.java.dev.rome</groupId>
    <artifactId>rome</artifactId>
    <version>1.0.0</version>
</dependency>

Step 4. write your very simple controller
@Controller
public class FeedController {
    static final String LAST_UPDATE_VIEW_KEY = "lastUpdate";
    static final String NEWS_VIEW_KEY = "news";
    private NewsRepository newsRepository;
    private String viewName;

    protected FeedController() {} //required by cglib

    public FeedController(NewsRepository newsRepository, String viewName) {
        notNull(newsRepository); hasText(viewName);
        this.newsRepository = newsRepository;
        this.viewName = viewName;
    }

    @RequestMapping(value = "/feed", method = RequestMethod.GET)        
    @Transactional
    public ModelAndView feed() {
        ModelAndView modelAndView = new ModelAndView();
        modelAndView.setViewName(viewName);
        List<News> news = newsRepository.fetchPublished();
        modelAndView.addObject(NEWS_VIEW_KEY, news);
        modelAndView.addObject(LAST_UPDATE_VIEW_KEY, getCreationDateOfTheLast(news));
        return modelAndView;
    }

    private Date getCreationDateOfTheLast(List<News> news) {
        if(news.size() > 0) {
            return news.get(0).getCreationDate();
        }
        return new Date(0);
    }
}
And here's a test for it, in case you want to copy&paste (who doesn't?):
@RunWith(MockitoJUnitRunner.class)
public class FeedControllerShould {
    @Mock private NewsRepository newsRepository;
    private Date FORMER_ENTRY_CREATION_DATE = new Date(1);
    private Date LATTER_ENTRY_CREATION_DATE = new Date(2);
    private ArrayList<News> newsList;
    private FeedController feedController;

    @Before
    public void prepareNewsList() {
        News news1 = new News().title("title1").creationDate(FORMER_ENTRY_CREATION_DATE);
        News news2 = new News().title("title2").creationDate(LATTER_ENTRY_CREATION_DATE);
        newsList = newArrayList(news2, news1);
    }

    @Before
    public void prepareFeedController() {
        feedController = new FeedController(newsRepository, "viewName");
    }

    @Test
    public void returnViewWithNews() {
        //given
        given(newsRepository.fetchPublished()).willReturn(newsList);
        
        //when
        ModelAndView modelAndView = feedController.feed();
        
        //then
        assertThat(modelAndView.getModel())
                .includes(entry(FeedController.NEWS_VIEW_KEY, newsList));
    }

    @Test
    public void returnViewWithLastUpdateTime() {
        //given
        given(newsRepository.fetchPublished()).willReturn(newsList);

        //when
        ModelAndView modelAndView = feedController.feed();

        //then
        assertThat(modelAndView.getModel())
                .includes(entry(FeedController.LAST_UPDATE_VIEW_KEY, LATTER_ENTRY_CREATION_DATE));
    }

    @Test
    public void returnTheBeginningOfTimeAsLastUpdateInViewWhenListIsEmpty() {
        //given
        given(newsRepository.fetchPublished()).willReturn(new ArrayList<News>());

        //when
        ModelAndView modelAndView = feedController.feed();

        //then
        assertThat(modelAndView.getModel())
                .includes(entry(FeedController.LAST_UPDATE_VIEW_KEY, new Date(0)));
    }
}
Notice: here, I'm using fest-assert and mockito. The dependencies are:
<dependency>
 <groupId>org.easytesting</groupId>
 <artifactId>fest-assert</artifactId>
 <version>1.4</version>
 <scope>test</scope>
</dependency>
<dependency>
 <groupId>org.mockito</groupId>
 <artifactId>mockito-all</artifactId>
 <version>1.8.5</version>
 <scope>test</scope>
</dependency>

Step 5. write your very simple view
Here's where all the magic formatting happens. Be sure to take a look at all the methods of Entry class, as there is quite a lot you may want to use/fill.
import org.springframework.web.servlet.view.feed.AbstractAtomFeedView;
[...]

public class AtomFeedView extends AbstractAtomFeedView {
    private String feedId = "tag:yourFantastiSiteName";
    private String title = "yourFantastiSiteName: news";
    private String newsAbsoluteUrl = "http://yourfanstasticsiteUrl.com/news/"; 

    @Override
    protected void buildFeedMetadata(Map<String, Object> model, Feed feed, HttpServletRequest request) {
        feed.setId(feedId);
        feed.setTitle(title);
        setUpdatedIfNeeded(model, feed);
    }

    private void setUpdatedIfNeeded(Map<String, Object> model, Feed feed) {
        @SuppressWarnings("unchecked")
        Date lastUpdate = (Date)model.get(FeedController.LAST_UPDATE_VIEW_KEY);
        if (feed.getUpdated() == null || lastUpdate != null || lastUpdate.compareTo(feed.getUpdated()) > 0) {
            feed.setUpdated(lastUpdate);
        }
    }

    @Override
    protected List<Entry> buildFeedEntries(Map<String, Object> model, HttpServletRequest request, HttpServletResponse response) throws Exception {
        @SuppressWarnings("unchecked")
        List<News> newsList = (List<News>)model.get(FeedController.NEWS_VIEW_KEY);
        List<Entry> entries = new ArrayList<Entry>();
        for (News news : newsList) {
            addEntry(entries, news);
        }
        return entries;
    }

    private void addEntry(List<Entry> entries, News news) {
        Entry entry = new Entry();
        entry.setId(feedId + ", " + news.getId());
        entry.setTitle(news.getTitle());
        entry.setUpdated(news.getCreationDate());
        entry = setSummary(news, entry);
        entry = setLink(news, entry);
        entries.add(entry);
    }

    private Entry setSummary(News news, Entry entry) {
        Content summary = new Content();
        summary.setValue(news.getShortDescription());
        entry.setSummary(summary);
        return entry;
    }

    private Entry setLink(News news, Entry entry) {
        Link link = new Link();
        link.setType("text/html");
        link.setHref(newsAbsoluteUrl + news.getId()); //because I have a different controller to show news at http://yourfanstasticsiteUrl.com/news/ID
        entry.setAlternateLinks(newArrayList(link));
        return entry;
    }

}

Step 6. add your classes to your Spring context
I'm using xml approach. because I'm old and I love xml. No, seriously, I use xml because I may want to declare FeedController a few times with different views (RSS 1.0, RSS 2.0, etc.).

So this is the forementioned spring-mvc.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
        <property name="mediaTypes">
            <map>
                <entry key="atom" value="application/atom+xml"/>
                <entry key="html" value="text/html"/>
            </map>
        </property>
        <property name="viewResolvers">
            <list>
                <bean class="org.springframework.web.servlet.view.BeanNameViewResolver"/>
            </list>
        </property>
    </bean>

    <bean class="eu.margiel.pages.confitura.feed.FeedController">
        <constructor-arg index="0" ref="newsRepository"/>
        <constructor-arg index="1" value="atomFeedView"/>
    </bean>

    <bean id="atomFeedView" class="eu.margiel.pages.confitura.feed.AtomFeedView"/>
</beans>

And you are done.

I've been asked a few times before to put all the working code in some public repo, so this time it's the other way around. I've describe things that I had already published, and you can grab the commit from the bitbucket.

Hope that helps.

Apache HISE + Apache Camel

Check out this SlideShare Presentation: Apache HISE + Apache CamelView more presentations from Rafal Rusin.Check out this SlideShare Presentation: Apache HISE + Apache CamelView more presentations from Rafal Rusin.