Sneak Peek: Hadoop Monitoring comes to SPM

When it comes to Hadoop, they say you’ve got to monitor it and then monitor it some more.  Since our own Performance Monitoring and Search Analytics services run on top of Hadoop, we figured it was time to add Hadoop performance monitoring to SPM.  So here is a sneak peek at SPM for Hadoop.  If you’d like to try it on your Hadoop cluster, we’ll be sending invitations soon and you can get on the private beta list starting today!

In the mean time, here is a small sample of pretty self-explanatory reports from SPM for Hadoop, so you can get a sense of what’s available.  There are, of course, a number of other Hadoop-specific reports included, as well as server reports, filtering, alerting, multi-user support, report sharing, etc. etc.

Please don’t forget to tell us what else would you like us to monitor – select your candidates – and if you like what you see and want a good monitoring tool for your Hadoop cluster, please sign up for private beta now.

Click on any graph to see it in its full size and high quality.

Hadoop NameNode Files
Hadoop NameNode Files

.

Hadoop DataNode Read-Write
Hadoop DataNode Read-Write

.

Hadoop JobTracker MapReduce Runtime
Hadoop JobTracker MapReduce Runtime

.

Hadoop TaskTracker Tasks
Hadoop TaskTracker Tasks

.

What else would you like us to monitor with SPM?  Please select your candidates!

For announcements, promotions, discounts, service status, milk, cookies, and other goodies follow @sematext.

HBase FuzzyRowFilter: Alternative to Secondary Indexes

In this post we’ll explain the usage of FuzzyRowFilter which can help in many situations where secondary indexes solutions seems to be the only choice to avoid full table scans.

Background

When it comes to HBase the way you design your row key affects everything. It is a common pattern to have composite row key which consists of several parts, e.g. userId_actionId_timestamp. This allows for fast fetching of rows (or single row) based on start/stop row keys which have to be a prefix of the row keys you want to select. E.g. one may select last time of userX logged in by specifying row key prefix “userX_login_”. Or last action of userX by fetching the first row with prefix “userX_”. These partial row key scans work very fast and does not require scanning the whole table: HBase storage is optimized to make them fast.

Problem

However, there are cases when you need to fetch data based on key parts which happen to be in the middle of the row key. In the example above you may want to find last logged in users. When you don’t know the first parts of the key partial row key scan turns into full table scan which might be very slow and resource intensive.

Possible Solution #1: Data Redundancy

One possible way around it would be to use secondary indexes by creating redundant rows with the same data as original ones but with different sequence of the parts of the key (e.g. actionId_timestamp). This solution may not be suitable for some because of its cons:

  • storing extra indexes (usually it requires to store N times more data for N indexes) results in storing a lot more data on disk
  • storing (and serving) extra rows brings additional load on the cluster during writing and reading (extra blocks fighting to be in cache, etc.)
  • writing/updating/deleting several rows is not an atomic operation in HBase

Possible Solution #2: Integrated Secondary Indexes

Another way to attack the problem is to use smart secondary indexes mechanism integrated in HBase which doesn’t rely on data redundancy. E.g. something like IHBase. The problem here is that there’s no out-of-the box solution to be used. This may change with addition of newer CoProcessors functionality (see e.g. HBASE-2038 or this). But as of now existent solutions have their own limitations and drawbacks while new solutions are yet to be completed.

Suggested Solution

First of all, I have to say that solution suggested below is not a silver bullet. Moreover its performance may be very bad and even be close to full table scan in some cases. Even more: it can’t be used in any of the situations described in Background and Problem sections. But in many cases depending on your data the suggested simple solution can be used to avoid secondary indexes burden and still allow for very fast scans. In many other cases it can be used to significantly speed up your full table scans.

Suggested solution is not new and quite simple, but it is usually overlooked by HBase users, though it shouldn’t be.

Fast-Forwarding in Server-side Filter

In recent HBase versions (I believe in 0.90.+) there’s a mechanism that allows skipping the whole range of rows when scanning with server-side filter. These skipped rows data may not even be read from the disk. Based on the current row key the filter can tell scanner to advance to the row with the specific key and by doing that jump over many rows which are simply skipped. For example, this makes it possible to perform fast full-table scans (or large partial key scans) in case there’s enough information about the key and the data that allows to provide efficient hints for skipping a lot of rows during the scan.

Most of the time you’ll have to implement your own custom filter that performs fast-forwarding. Hint for these cases: refer to org.apache.hadoop.hbase.filter.Filter.ReturnCode.SEEK_NEXT_USING_HINT in HBase sources.

FuzzyRowFilter

FuzzyRowFilter is one of the handy filters which is available and which performs fast-forwaring based on the fuzzy row key mask provided by user. It will be available out of the box in the next HBase release, but you can now download its sources from HBASE-6509 (use latest patch) and use it as any other custom filter (there’s no need to patch HBase, etc. it relies on existing functionality).

FuzzyRowFilter takes as parameters row key and a mask info. In example above, in case we want to find last logged in users and row key format is userId_actionId_timestamp (where userId has fixed length of say 4 chars), the fuzzy row key we are looking for is “????_login_”. This translates into the following params for FuzzyRowKey:

FuzzyRowFilter rowFilter = new FuzzyRowFilter(
 Arrays.asList(
  new Pair<byte[], byte[]>(
    Bytes.toBytesBinary("\\x00\\x00\\x00\\x00_login_"),
    new byte[] {1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0})));

I.e. the row key to compare with is provided as the first byte array (at the byte positions where any value is allowed, “\x00” is set, which is translated into (byte) 0). To tell which positions are fixed and which are not fixed, second byte array is provided (mask info) with zeroes on positions whose values are “fixed” and ones at the “non-fixed” positions.

Thus one can define different fuzzy row key masks, including those with “non-fixed” positions anywhere in the middle of the key. E.g.: “hb?se” or “??pred?ce”.

Note that FuzzyRowFilter accepts more than one mask: if row key satisfies at least one, the row will be included in the result. E.g. by providing masks “????_login_” and “????_register_” we can find last logged in and registered users.

How It Works

In the example above, with the mask “????_login_” scan initially navigates to the first row of the table. It is likely to be a user-action record (let userId to be “0001”), but the action may be not “login”. In this case as filter knows the “current” user (“0001”) and the action it is looking for, filter tells scan to jump to the row with the key “0001_login_”. By doing that, many rows may be skipped from the scanning (if we track other user actions apart from “login”, there are likely a lot more other user-action records than user logins). Then it scans user login actions records until it faces the record with action which is not login, say “0001_logout”. In this case filter knows that there’s no point in scanning this user’s records and tells scanner to jump to the next user “0002_login_” and it will continue scanning its records. Note: there might be no “0002” user, filter knows nothing about users, it simply suggests the next user id by increasing the current one by one. In this case scan will automatically jump to the next existing user, and the steps above will be repeated.

Limitations & Performance Considerations

As you probably already have figured out from the example above, FuzzyRowFilter can be applied only if userId has fixed length. While it is  usually not hard to design the row key format so that its parts have fixed length (at least those parts that we need to mask with “???”), in many situations it may be problematic.

The efficiency of using FuzzyRowFilter (and any other fast-forwarding filters) is determined by how many records filter can actually skip and how many jumps it has to do to skip them.

Performance of the scan based on FuzzyRowFilter usually depends on the cardinality of the fuzzy part. E.g. in the example above, if users number is several hundreds to several thousand, the scan should be very fast: there will only be several hundreds or thousand “jumps” and huge amount of rows might be skipped. If the cardinality is high then scan can take a lot of time. The worst-case scenario is when you have N records and N users, i.e. one record per user. In this case there’s simply nothing to skip.

At times when the performance of full-table scan with the help of FuzzyRowFilter is not suitable for serving online data, it has still proven to be very efficient when you feed data from HBase into MapReduce job. Don’t overlook this!

Summary

There are times when you design the row key for the data to be stored in HBase and feel the need for the secondary indexes, because of very different data access patterns. In this situation consider relying on FuzzyRowFilter for some of the data reading use-cases. Depending on your data with small adjustments of the row key format (sometimes it is not even needed) you can benefit from very fast fetching of records where before you needed to perform full table scans or very large partial key scans.

Plug: if this sort of stuff interests you, we are hiring people who know and love to work with Hadoop, HBase, MapReduce…

@abaranau

Mahout Digest, October 2010

We’ve been very busy here at Sematext, so we haven’t covered Mahout during the last few months.  We are pleased with what’s been keeping us busy, but are not happy about our irregular Mahout Digests.  We had covered the last (0.3) release with all of its features and we are not going to miss covering very important milestone for Mahout: release 0.4 is out! In this digest we’ll summarize the most important changes in Mahout from the last digest and add some perspective.

Before we dive into Mahout, please note that we are looking for people with Machine Learning skills and Mahout experience (as well as good Lucene/Solr search people).  See our Hiring Search and Data Analytics Engineers post.

This Mahout release brings overall changes regarding model refactoring and command line interface to Mahout aimed at improving integration and consistency (easier access to Mahout operations via the command line). The command line interface is pretty much standardized for working with all the various options now, which makes it easier to run and use. Interfaces are better and more consistent across algorithms and there have been many small fixes, improvements, refactorings, and clean-ups. Details on what’s included can be found in the release notes and download is available from the Apache Mirrors.

Now let’s add some context to various changes and new features.

GSoC projects

Mahout completed its Google Summer of Code  projects and two completed successfully:

  • EigenCuts spectral clustering implementation on Map-Reduce for Apache Mahout (addresses issue MAHOUT-328), proposal and implementation details can be found in MAHOUT-363
  • Hidden Markov Models based sequence classification (proposal for a summer-term university project), proposal and implementation details in  MAHOUT-396

Two projects did not complete due to lack of student participation and one remains in progress.

Clustering

The biggest addition in clustering department are EigenCuts clustering algorithm (project from GSoC) and MinHash based clustering which we covered as one of possible GSoC suggestions in one of previous digests . MinHash clustering was implemented, but not as a GSoC project. In the first digest from the Mahout series we covered problems related to evaluation of clustering results (unsupervised learning issue), so big addition to Mahout’s clustering are Cluster Evaluation Tools featuring new ClusterEvaluator (uses Mahout In Action code for inter-cluster density and similar code for intra-cluster density over a set of representative points, not the entire clustered data set) and CDbwEvaluator which offers new ways to evaluate clustering effectiveness.

Logistic Regression

Online learning capabilities such as Stochastic Gradient Descent (SGD) algorithm implementation are now part of Mahout. Logistic regression is a model used for prediction of the probability of occurrence of an event. It makes use of several predictor variables that may be either numerical or categories. For example, the probability that a person has a heart attack within a specified time period might be predicted from knowledge of the person’s age, sex and body mass index. Logistic regression is used extensively in the medical and social sciences as well as marketing applications such as prediction of a customer’s propensity to purchase a product or cease a subscription. The Mahout implementation uses Stochastic Gradient Descent (SGD), check more on initial request and development in MAHOUT-228. New sequential logistic regression training framework supports feature vector encoding framework for high speed vectorization without a pre-built dictionary. You can find more details on Mahout’s logistic regression wiki page.

Math

There has been a lot of cleanup done in the math module (you can check details in Cleanup Math discussion on ML), lot’s of it related to an untested Colt framework integration (and deprecated code in Colt framework). The discussion resulted in several pieces of Colt framework getting promoted to a tested status (QRdecomposition, in particular)

Classification

In addition to speedups and bug fixes, main new features in classification are new classifiers (new classification algorithms) and more open/uniformed input data formats (vectors). Most important changes are:

  • New SGD classifier
  • Experimental new type of Naive bayes classifier (using vectors) and feature reduction options for existing Naive bayes classifier (variable length coding of vectors)
  • New VectorModelClassifier allows any set of clusters to be used for classification (clustering as input for classification)
  • Now random forest can be saved and used to classify new data. Read more on how to build a random forest and how to use it to classify new cases on this dedicated wiki page.

Recommendation Engine

The most important changes in this area are related to distributed similarity computations which can be used in Collaborative Filtering (or other areas like clustering, for example). Implementation of Map-Reduce job, based on algorithm suggested in Elsayed et al: Pairwise Document Similarity in Large Collections with MapReduce, which computes item-item similarities for item-based Collaborative Filtering can be found in MAHOUT-362. Generalization of algorithm based on the mailing list discussion led to an implementation of  Map-Reduce job which computes pairwise similarities of the rows of a matrix using a customizable similarity measure (with implementations already provided for Cooccurrence, Euclidean Distance, Loglikelihood, Pearson Correlation, Tanimoto coefficient, Cosine). More on distributed version of any item similarity function (which was available in a non-distributed implementation before) can be found in MAHOUT-393. With pairwise similarity computation defined, RecommenderJob has been evolved to a fully distributed item-based recommender (implementation depends on how the pairwise similarities are computed). You can read more on distributed item-based recommender in MAHOUT-420.

Implementation of distributed operations on very large matrices are very important for a scalable machine learning library which supports large data sets. For example, when term vector is built from textual document/content, terms vectors tend to have high dimension. Now,  if we consider a term-document matrix where each row represents terms from document(s), while a column represents a document we obviously end up with high dimensional matrix. Same/similar thing occurs in Collaborative Filtering: it uses a user-item matrix containing ratings for matrix values, row corresponds to a user and each column represents an item. Again we have large dimension matrix that is sparse.

Now, in both cases (term-document matrix and user-item matrix) we are dealing with high matrix dimensionality which needs to be reduced, but most of information needs to be preserved (in best way possible). Obviously we need to have some sort of matrix operation which will provide lower dimension matrix with important information preserved. For example, large dimensional matrix may be approximated to lower dimensions using Singular Value Decomposition (SVD).

It’s obvious that we need some (java) matrix framework capable of fundamental matrix decompositions. JAMA is a great example of widely used linear algebra package for matrix operations, capable of SVD and other fundamental matrix decompositions (WEKA for example uses JAMA for matrix operations). Operations on highly dimensional matrices always require heavy computation and this requirements produces high HW requirements on any ML production system. This is where Mahout, which features distributed operations on large matrices, should be the production choice for Machine Learning algorithms over frameworks like JAMA, which although great, can not distribute its operations.

In typical recommendation setup users often ‘have’ (used/interacted with) only a few items from the whole item set (item set can be very large) which leads to user-item matrices being sparse matrices. Mahout’s (0.4) distributed Lanczos SVD implementation is particularly useful for finding decompositions of very large sparse matrices.

News and Roadmap

All of the new distributed similarity/recommender implementations we analyzed in previous paragraph were contributed by Sebastian Schelter and as a recognition for this important work he was elected as a new Mahout committer.

The book “Mahout in Action”, published by Manning, has reached 15/16 chapters complete and will soon enter final review.

This is all from us for now.  Any comments/questions/suggestions are more than welcome and until next Mahout digest keep an eye on Mahout’s road map for 0.5 or discussion about what is Mahout missing to become production stabile (1.0) framework.  We’ll see you next month – @sematext.

Hiring Search and Data Analytics Engineers

We are growing and looking for smart people to join us either in an “elastic”, on-demand, per-project, or more permanent role:

Lucene/Solr expert who…

  • Has built non-trivial applications with Lucene or Solr or Elastic Search, knows how to tune them, and can design systems for large volume of data and queries
  • Is familiar with (some of the) internals of Lucene or Solr or Elastic Search, at least on the high level (yeah, a bit of an oxymoron)
  • Has a systems/ops bent or knows how to use performance-related UNIX and JVM tools for analyzing disk IO, CPU, GC, etc.

Data Analytics expert who…

  • Has used or built tools to process and analyze large volumes of data
  • Has experience using HDFS and MapReduce, and have ideally also worked with HBase, or Pig, or Hive, or Cassandra, or Voldemort, or Cascading or…
  • Has experience using Mahout or other similar tools
  • Has interest or background in Statistics, or Machine Learning, or Data Mining, or Text Analytics or…
  • Has interest in growing into a Lead role for the Data Analytics team

We like to dream that we can find a person who gets both Search and Data Analytics, and ideally wants or knows how to marry them.

Ideal candidates also have the ability to:

  • Write articles on interesting technical topics (that may or may not relate to Lucene/Solr) on Sematext Blog or elsewhere
  • Create and give technical talks/presentations (at conferences, local user groups, etc.)

Additional personal and professional traits we really like:

  • Proactive and analytical: takes initiative, doesn’t wait to be asked or told what to do and how to do it
  • Self-improving and motivated: acquires new knowledge and skills, reads books, follows relevant projects, keeps up with changes in the industry…
  • Self-managing and organized: knows how to parcel work into digestible tasks, organizes them into Sprints, updates and closes them, keeps team members in the loop…
  • Realistic: good estimator of time and effort (i.e. knows how to multiply by 2)
  • Active in OSS projects: participates in open source community (e.g. mailing list participation, patch contribution…) or at least keeps up with relevant projects via mailing list or some other means
  • Follows good development practices: from code style to code design to architecture
  • Productive, gets stuff done: minimal philosophizing and over-designing

Here are some of the Search things we do (i.e. that you will do if you join us):

  • Work with external clients on their Lucene/Solr projects.  This may involve anything from performance troubleshooting to development of custom components, to designing highly scalable, high performance, fault-tolerant architectures.  See our services page for common requests.
  • Provide Lucene/Solr technical support to our tech support customers
  • Work on search-related products and services

A few words about us:

We work with search and big data (Lucene, Solr, Nutch, Hadoop, MapReduce, HBase, etc.) on a daily basis.  Our projects with external clients range from 1 week to several months.  Some clients are small startups, some are large international organizations.  Some are top secret.  New customers knock on our door regularly and this keeps us busy at pretty much all times.  When we are not busy with clients we work on our products.  We run search-lucene.com and search-hadoop.com.  We participate in open-source projects and publish monthly Digest posts that cover Lucene, Solr, Nutch, Mahout, Hadoop, and HBase.  We don’t write huge spec docs, we work in sprints, we multitask, and try our best to be agile. We send people to conferences, trainings (Hadoop, HBase, Cassandra), and certifications (2 of our team members are Cloudera Certified Hadoop Developers).

We are a small and mostly office-free, highly distributed team that communicates via email, Skype voice/IM, BaseCamp.  Some of our developers are in Eastern Europe, so we are especially open to new team members being in that area, but we are also interested in good people world-wide, from South America to Far East.

Interested? Please send your resume to jobs @ sematext.com.

HBase Case-Study: Using HBaseTestingUtility for Local Testing & Development

Motivation

As HBase becomes more mature there’s is a growing demand for tools and methods for making development process easier – here at Sematext (@sematext) we’ve gone through our own per aspera ad astra learning process in addition to Cloudera’s Hadoop trainings and certifications. In this post we share what we’ve learned and show how one can HBaseTestingUtility for this.

Suppose there is a system that deals with processing data stored in HBase and displaying stored data via reporting application. Data processing is done using Hadoop MapReduce jobs. During development, it would be desirable to be able to:

  • debug MapReduce jobs in an IDE
  • run reporting application locally (on developer’s machine, without setting up a cluster) with possibility of debugging in IDE
  • easily access data stored in HBase for debugging purposes (easily means “naturally” as if all rows are in a text file)

Disclaimer

Described use-case and solution are just one option, an option that makes use of HbaseTestingUtility and underlying “mini” clusters. Depending on the context, this solution might not be the most optimal, but it is a good fit for presenting the ideas. This solution and this post should encourage developers to look at HBase’s unit-test sources when constructing their own tests and/or when finding ways for easier debugging & development.

Problem Details

In our example there are two tables in HBase: one with raw data and another with processed data.  Let’s call them RawDataTable and ProcessedDataTable. We import data into RawDataTable via simple importing MapReduce job which initally takes data from a log file. Subsequently, another MapReduce job processes data in that table and stores the outcome into ProcessedDataTable. We use HBase Scan and Get operations to access the processed data from the client.

Solution

As stated in javadocs, HBaseTestingUtility is a “facility for testing HBase”. Its description comes with a bit more of explanation: “Create an instance and keep it around doing HBase testing. This class is meant to be your one-stop shop for anything you mind need testing. Manages one cluster at a time only.” In this post we describe one possible way of how to use it to achieve the goals described above.

Processing Data

Step 1: Init cluster.

The following code starts “local” cluster and creates two tables:

private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
private HTable rawDataTable;
private HTable processedDataTable;
…
void initCluster() throws Exception {
  testUtil.getConfiguration().addResource("hbase-site-local.xml");
  testUtil.getConfiguration().reloadConfiguration();
  // start mini hbase cluster
  testUtil.startMiniCluster(1);
  // create tables
  rawDataTable = testUtil.createTable(RAW_TABLE_NAME, RAW_TABLE_COLUMN_FAMILIES);
  processedDataTable = testUtil.createTable(PROCESSED_TABLE_NAME, PROCESSED_TABLE_COLUMN_FAMILIES);
  // start MR cluster
  testUtil.startMiniMapReduceCluster();
}

testUtil.startMiniCluster(1) means start cluster with 1 datanode and 1 regionserver. You can start cluster with greater number of servers for test purposes.

Step 2: Import Data

We use simple map-only job for import data. Please refer to org.apache.hadoop.hbase.mapreduce.ImportTsv class for an example of such a job. The following code runs the job that uses locally stored files (e.g. a part of the log file of reasonable size) on just created cluster:

String[] importJobArgs = new String[] {RAW_TABLE_NAME, "file://" + inputFile};
if (!MyImportJob.createSubmittableJob(testUtil.getConfiguration(), importJobArgs).waitForCompletion(true)) {
  System.exit(1);
}

Step 3: Process Data

To process data in RawDataTable we run an appropriate MapReduce job in the same way as during the import:

if (!ProcessLogsJob.createSubmittableJob(testUtil.getConfiguration(), processLogsJobArgs).waitForCompletion(true)) {
  System.exit(1);
}

Step 4: Persist Processed Data

Since we need processed data during our reporting application development and debugging we persist it in some local file. In order to have “easy” access to this data during debugging it makes sense to store table data in a text file in a readable form (so that we could perform “grep” and other handy commands). So we actually write to two files at once. The Result class implements Writable interface, so there is a natural way to serialize its data.

BufferedWriter bw = ...;
DataOutputStream dos = ...;
ResultScanner rs = processedDataTable.getScanner(new Scan());
Result next = rs.next();
while (next != null) {
  next.write(dos);
  bw.write(getHumanReadableString(next));
  bw.newLine();
  next = rs.next();
}

After this step, the processed data is stored on the local disk and can be used for running the reporting application. Importing and processing of data is performed locally and is thus easier to debug.
In order to add extra processed data incrementally to the already stored data, instead of rewriting it from scratch, we need to load it from the file after cluster initialization as described in the following section.

Fetching Data

In order to make reporting application run on “local” cluster instead of the “true” one, we create an alternative HTable factory. Reporting application code uses a single HTable object instantiated by the factory during its whole lifecycle – this is the best practice for minimizing creation of HTable objects.

Step 1: Init cluster.

This step is exactly the same as described previously.

Step 2: Load processed data.

We use a file created during processing data stage to load the data back into just initialized cluster:

DataInputStream dis = ...;
Result next = new Result();
next.readFields(dis);
while (next.getRow() != null) {
  Put put = new Put(next.getRow());
  for (KeyValue kv : next.raw()) {
    put.add(kv);
  }
  processedDataTable.put(put);
  next = new Result();
  try {
    next.readFields(dos);
  } catch (EOFException e) {
    // file went to an end.
    break;
  }
}

After data is all loaded, the constructed processedDataTable can be used by the reporting application code. The app can now also be started and debugged easily from an IDE.

Next Steps

Internally HBaseTestingUtility makes use of a whole bunch of “mini” clusters: MiniZooKeeperCluster, MiniDFSCluster, MiniHBaseCluster and MiniMRCluster. Refer to the unit-test implementations in the source code of respective projects to get more examples on how to use them.

Thank you for reading, we hope you found this useful.  Follow @sematext on Twitter to be notified of new posts on Hadoop, HBase, Lucene, Solr, Mahout, and other related topics.

Introducing Cloud MapReduce

The following post is the introduction to Cloud MapReduce (CMR) written by Huan Liu, CMR’s main author and the Research Manager at Accenture Technology Labs.

MapReduce is a programming model (borrowed from functional programming languages) and its associated implementation, and it was first proposed by Google in 2003 to cope with the challenge of processing an exponentially growing amount of data. In the same year the technology was invented, Google’s production index system was converted to MapReduce. Since then, it is quickly proven to be applicable to a wide range of problems. For example, there are roughly 10,000 MapReduce programs written in Google by June 2007, and there are 2,217,000 MapReduce job runs in the month of September 2007. MapReduce also has found wide application outside of the Google environment.

Cloud MapReduce is another implementation of the MapReduce programming model. Back in late 2008, we saw the emergence of a cloud Operating System (OS) — a set of software managing a large cloud infrastructure rather than an individual PC. We asked ourselves the following questions: what if we build systems on top of a cloud OS instead of directly on bare metal? Can we dramatically simplify system design? We thought we will try implementing MapReduce as a proof of concept; thus, the Cloud MapReduce project was born. At the time, Amazon was the only one that has a “complete” cloud OS, so we built on top of it. In the course of the project, we encountered a lot of problems working with the Amazon cloud OS, most could be attributed to the weaker consistent model it presents. Fortunately, we were able to work through all issues and successfully built MapReduce on top of the Amazon cloud OS. The end result surprises us somewhat. We are not only able to have a simpler design, but we are also able to make it more scalable, more fault tolerant and faster than Hadoop.

Why Cloud MapReduce

There are already several MapReduce implementations, including Hadoop, which is already widely used. So the natural question to ask is: why another implementation? The answer is that all previous implementations essentially copy what Google have described in their original MapReduce paper, but we need to explore alternative implementation approaches for the following reasons:

1) Patent risk. MapReduce is a core technology in Google. By using MapReduce, Google engineers are able to focus on their core algorithms, rather than being bogged down by parallelization details. As a result, MapReduce greatly increases their productivity. It is no surprise that Google would patent such a technology to maintain its competitive advantage. The MapReduce patent covers the Google implementation as described in its paper. Since CMR only implements the programming model, but has a totally different architecture and implementation, it poses a minimal risk w.r.t. the MapReduce patent. This is particularly important for enterprise customers who are concerned about potential risks.

2) Architectural exploration. Google only described one implementation of the MapReduce programming model in its paper. Is it the best one? What are the tradeoffs if using a different one? CMR is the first to explore a completely different architecture. In the following, I will describe what is unique about CMR’s architecture.

Architectural principle and advantages

CMR advocates component decoupling: separate out common components as independent cloud services. If we can separate out a common component as a stand-alone cloud service, the component not only can be leveraged for other systems, but it can also evolve independently. As we have seen in other contexts (e.g., SOA, virtualization), decoupling enables faster innovation.

CMR currently uses existing components offered by Amazon, including Amazon S3, SimpleDB, and SQS. By leveraging the concept of component decoupling, CMR achieves a couple of key advantages.

A fully distributed architecture. Since each component is a smaller project, it is easier to build it as a fully distributed system. Amazon has done it for all its services (S3, SimpleDB, SQS). Building on what Amazon has done, we are able to build a fully distributed MapReduce implementation with only 3,000 lines of code. A fully distributed architecture has several advantages over a master/slave architecture. First, it is more fault tolerant. Many enterprise customers are not willing to adopt something with a single point of failure, especially for their mission critical data. Second, it is more scalable. In one comparison study, we are able to stress the master node in Hadoop so much that CMR has a 60x performance advantage.

More efficient data shuffling between Map and Reduce. CMR uses queue as the intermediate point between Map and Reduce, which enables Map to “push” data to Reduce (rather than Reduce “pulls” data from Map). This design is similar to what is used in parallel databases, so it inherits the benefits of efficient data transfer as a result of pipelining. However, unlike in parallel databases, by using tagging, filtering and a commit mechanism, CMR still maintains the fine grain fault tolerance property offered by other MapReduce implementations. The majority of CMR’s performance gain (aside from the 60x gain which is from stressing the master node) comes from this optimization.

CMR is particularly attractive in a cloud environment due to its native integration with the cloud. Hadoop, on the other hand, is designed for a static environment inside an enterprise. If run in a cloud, Hadoop introduces additional overhead. For example, after launching a cluster of virtual machines, Amazon’s Elastic MapReduce (pay-per-use Hadoop) has to configure and setup Hadoop on the cluster, then copy data from S3 to the Hadoop file system before it can start data processing. At the end, it also has to copy results back to S3. All these steps are additional overheads that CMR does not incur, because CMR starts processing right away on S3 data directly, no cluster configuration and data copying are necessary.

CMR’s vision

Although CMR currently only runs on Amazon components, we envision that it will support a wide range of components in the future, including other clouds, such as Microsoft Windows Azure. There are a number of very interesting open source projects already, such as jclouds, libcloud, deltacloud and Dasein, that are building a layer of abstraction on top of various cloud services to hide their differences. These middleware would make it much easier for CMR to support a large number of cloud components.

At the same time, we are also looking at how to build these components and deploy them locally inside an enterprise. Although several products, such as vCloud and Eucalyptus, provide cloud services inside an enterprise, their current version is limited to the compute capability. There are other cloud services, such as storage and queue, that an enterprise has to deploy to provide a full cloud capability to its internal customers. At Accenture Technology Labs, we are helping to address some pieces of the puzzle. For example, we have started a research project to design a fully distributed and scalable queue service, which is similar to SQS in functionality, but exploring a different tradeoff point.

Synergy with Hadoop

Although, on the surface, CMR may seem to compete with Hadoop, there are actually quite a bit of synergies between the two projects. First, they are both moving to the vision of component decoupling. In the recent 0.20.1 release of Hadoop, the HDFS file system is separated out as an independent component. This makes a lot of sense because HDFS is useful as a stand alone component to store large data sets, even if the users are not interested in MapReduce at all. Second, there are lessons to be learned from each project. For example, CMR points the way on how to “push” data from Map to Reduce to streamline data transfer without sacrificing fine-grain fault tolerance. Similarly, Hadoop supports rich data types beyond simple strings, which is something that CMR will for sure inherit in the near future.

Hopefully, by now, I have convinced you that CMR is something that is at least worth a look. In the next post (coming in two weeks), I will follow up with a practical post showing step by step how to write a CMR application and how to run it. The current thinking is that I will demonstrate how to perform certain data analytics with data from search-hadoop.com, but I am open to suggestions. If you have suggestions, I would appreciate if you could post them in comments below.