Deferring Processing Updates to Increase HBase Write Performance

In this post we’ll examine, via an example, how deferring the processing of updates can increase write throughput of a data store. In the end we’ll introduce our recently open-sourced HBaseHUT, a tool for automating of deferred updates in HBase.

Please note, that “deferred updates” here does not mean that updates are not “visible” immediately after a write operation. The system still serves add/update/delete and read operations in real-time.

Plug: if this sort of stuff interests you, we are hiring people who know about Hadoop, HBase, MapReduce…

Constraints & Assumptions

The decisions and ideas in the post are based on facts about HBase, but can be applied to many other data stores with similar characteristics:

  • Fetching a set of records as a range (scan operation) is much cheaper than fetching the same records one-by-one
  • Adding new records (simple write operation) is very fast
  • Storage space is cheap
  • “In-place” updates of records are not generally feasible

Well, the last assumption is a bit tricky: HBase provides incrementColumnValue() operation but it is very limited: one can only increment a specified cell that contains long value by some delta. In future versions HBase will allow incrementing of multiple cells at once (already in HBase trunk). However, not all update operations are that simple, as we’ll see in the example. Moreover, incrementing cells in a record is slower than writing a new record (we’ll use this fact later in the post).

Idea Explained

The idea behind deferred updates processing is to postpone updating of the existing record and store incoming deltas as a new record. Thus, record update operations become a simple write operations with corresponding performance. Deferred updates technique elaborated here fits well when system handles a lot of updates of stored data and write performance is the main concern, while reading speed requirements are not that strict. The following cases (each of them separately or any combination of them) may indicate  that one can benefit from using the technique (the list is not complete):

  • updates are well spread over the whole and large dataset
  • lower rate (among write operations) of “true updates” (i.e. low percentage of writes are for completely new data, not really updates of existing data)
  • good portion of data stored/updated may never be accessed
  • system should be able to handle high write peaks without major performance degradation

Next, let’s look at an example use-case to illustrate the idea better.

Example Use-Case

Imagine a simple presentation slides sharing system that gives one the ability to browse presentations on-line (not as downloadable PDF or whatever format files, since that would not be interesting for us here). Suppose we want to provide the presentation’s author with details on user behaviour when a user navigates through slides so that the author can use this data to infer the quality of the presentation (hey, slide sharing systems maintainers: free hint!). Let’s limit statistics data we want to share to simple metrics for the sake of keeping the example simple. Here’s what we can provide:

  • number of times a particular slide was viewed
  • avg/min/max time spent by user on a particular slide

Using these stats the presentation author can find out e.g.:

  • what are the most difficult to understand slides (they probably need to be divided into multiple ones, or better preparation before showing them should be done by giving more/better info on previous slides)
  • what are the most trivial slides (they can be merged/removed)
  • what are the typical users (e.g., If users spend a lot of time thinking on the slides the author can add more details in the presentation.  On the other hand, if users usually quickly walk through it then some very deep details can be removed)

Such a system could provide more stats to authors so that they can work on other sorts of presentation improvements.  For example, the system could provide information about user navigation actions (e.g., if users often go back and forth between two slides then probably the schemes/diagrams on them are better understandable if they are on the same slide and can be easily compared), but that’s not the focus of this post.

To track previously mentioned stats data we need to capture how much time a user spent on a particular slide and send this data to our data store. Let’s assume our records store the following info: {(key)presentationId_slideId, number of views, total viewing time, max viewing time, min viewing time}. Using this data we can provide useful information to author as needed.

Every time user leaves the slide, tracking logic sends the next data to the data store: {(key)presentationId_slideId, time spent on viewing}.

Direct Solution

The straight forward approach would be to update the presentationId_slideId record (fetch, modify and store updated data) as new data comes in. Here are some drawbacks of this naive approach:

  • for each update operation we have to perform fetch and write operations (Get and Put in case of HBase)
  • Most authors won’t really care about all these stats and majority of them will never look at it, so many processing operations can simply be omitted (until there’s interest in stats)
  • for first ever viewing of a slide a redundant fetch operation is performed (data is not there yet), which is OK if slides are viewed many times but in situation when many presentations are viewed just one time (e.g. by their creators) this may result in a bigger portion of redundant operations (this assumption is sort of made up, yes)

The very first point is a major one and is the obvious write performance killer.  This point is really not made up – we have first hand experience with that particular situation.

Deferring Updates Processing

One alternative solution is to defer updates processing to the time when data is requested by presentation author or to the time when the scheduled/periodic updates processing is performed (ideally when data store write load is minimal). In this deferred updates approach all new deltas (new user action data) are written as new records.

When an author requests the stats, efficient updates processing takes place. This time it is done in much more efficient way (compared to direct approach): by using range scan operations, and not by individual record read/writes. Given that presentation has usually tens of slides and is viewed by hundreds or thousands people such range scans are so fast the user will barely notice them. Also, it should be quite OK to ask a person to wait several seconds during his first view of stats page (processed updates are written back into the data store, so next time user fetches data it is available right away).

To prevent too long updates processing on user request the data can be processed periodically instead of at request time. Given the fact that (updated) statistics data is available to user immediately, the processing can be postponed for a long time without affecting the “real-time” visibility aspect of the system. The processing of updates can be scheduled to take place during off-peak hours, when they won’t compromise write performance. By doing this we add a “cushion” since from this point on the maximum amount of data that needs to be processed “on-the-fly” is one day ‘s worth of updates (assuming daily updates processing) , which is sufficient for the majority of cases (e.g. estimated max presentation views per day is tens of thousands which is a very quick scan operation).

The remainder of this post provides details of our implementation of this approach.

HBaseHUT

HBaseHUT is a tool that automates deferred processing of updates for HBase. It hides many details of updates processing from the client code, thus making it an easy to use solution. There are Put and ResultScanner wrappers that encapsulate all needed logic.

To write new data you need to use HutPut implementation of Put. Since it implements the standard HBase Put only the instance creation code needs to be changed:

public class SlideStatsTracker {
  byte[] SLIDE_CF = Bytes.toBytes("slide");
  byte[] VIEWS_NUM_C = Bytes.toBytes("views");
  byte[] TOTAL_VTIME_C = Bytes.toBytes("total_time");
  byte[] MIN_VTIME_C = Bytes.toBytes("min_time");
  byte[] MAX_VTIME_C = Bytes.toBytes("max_time");
  ...
  void trackSlideView(byte[] presentationId, byte[] slideId, long timeSpent)
                               throws InterruptedException, IOException {
    Put put = new HutPut(Bytes.add(presentationId , slideId));
    put.add(SLIDE_CF, VIEWS_NUM_C, Bytes.toBytes(1));
    put.add(SLIDE_CF, TOTAL_VTIME_C, Bytes.toBytes(timeSpent));
    hTable.put(put);
  }
  …
}

Fetching stats is performed using HutResultScanner which implements ResultScanner and hence can also be very easily integrated into the code that uses normal HBase API:

  Scan scan = new Scan(presentationId);
  ResultScanner resultScanner =
      new HutResultScanner(hTable.getScanner(scan), updateProcessor);
  Result result = resultScanner.next();
  while (result != null) {
    // fetch data from result object
    result = resultScanner.next();
  }

The updateProcessor passed as parameter encapsulates update operation logic:

  class SlideStatsUpdateProcessor implements UpdateProcessor {
    @Override
    public void process(Iterable records, UpdateProcessingResult processingResult) {
      int viewsNumber = 0;
      long totalViewTime = 0;
      long minViewTime = Long.MAX_VALUE;
      long maxViewTime = Long.MIN_VALUE;
      // Processing records
      for (Result record : records) {
        for (int i = 0; i < 5; i++) {
          int views = Bytes.toInt(record.getValue(SLIDE_CF, VIEWS_NUM_C));
          long totalTime = Bytes.toLong(record.getValue(SLIDE_CF, TOTAL_VTIME_C));
          long minTime = Bytes.toLong(record.getValue(SLIDE_CF, MIN_VTIME_C));
          long maxTime = Bytes.toLong(record.getValue(SLIDE_CF, MAX_VTIME_C));

          viewsNumber += views;
          totalViewTime += totalTime;
          minViewTime = minTime  maxViewTime ? maxTime : maxViewTime;
        }
      }

      processingResult.add(SLIDE_CF, VIEWS_NUM_C, Bytes.toBytes(viewsNumber));
      processingResult.add(SLIDE_CF, TOTAL_VTIME_C, Bytes.toBytes(totalViewTime));
      processingResult.add(SLIDE_CF, MIN_VTIME_C, Bytes.toBytes(minViewTime));
      processingResult.add(SLIDE_CF, MAX_VTIME_C, Bytes.toBytes(maxViewTime));
    }
  }

As you one can see, HBaseHUT abstraction doesn’t change HBase API and that makes it very easy to use in new code or integrate it in old code.

You can find more details about HBaseHUT features on the project’s wiki. HBaseHUT is a new, recently emerged project – your feedback/questions/ideas/feature requests/forks/pull requests, etc. are all very welcome. Please start new discussions on HBaseHUT’s mailing list.

If you read this far, we’d like to talk to you – we are hiring smart people who know about Hadoop, HBase, MapReduce, want to do large scale data (stream) processing, and build tools like HBaseHUT.

Solr Digest, November 2010

It is time for the last Solr Digest of 2010; the next Digest will be published some time in January 2011. This was not a month with too many interesting developments, so here we bring to your attention only the few more interesting bits. Here we go…

Already committed features

  • Anyone working with Polish language will be happy to hear that factory for Polish stemmer is committed to 3_x and trunk.

Interesting features in development

Miscellaneous

  • A fix for a feature that was committed earlier this year – Enable sorting by Function Query – is close to being committed. This is big one!  There were some problems with it: functions weren’t weighted, function query wasn’t being properly parsed, some deprecated bits of code were used, etc. Patch is already posted, so if you are eager to use this functionality you can start by applying the patch yourself.
  • Many people are using Spatial Search features recently introduced in Solr. If you’re considering that too, be careful about one limitation: there is no Spatial support for multi-valued fields. So, if you have multi-valued spatial fields and you’d like to do some sorting on them, you’ll end up with incorrect results. The feature we’re describing here can be found in some other search tools, though, like Elastic Search, so Solr might be getting it too some day. You can check if there is some progress with this in JIRA issues like SOLR-2154
  • There is a major bug in DataImportHandler – it doesn’t release JDBC connections. It appears that this issue isn’t related to any particular database, so this is an obvious bug in DIH. Check this JIRA issue for updates.
  • If you prefer git over svn, you might be interested in Solr’s git repository recently set up. Check this ML thread to learn more about it.

So long until 2011, Solr Digest readers!  Follow @sematext on Twitter for other stuff from Sematext.