Prologue: The Blind Men and the Elephant
As members of the Hadoop community, we here at Flurry have noticed that at many Hadoop-related gatherings, it is not atypical for about half of the attendees to be completely new to Hadoop. Perhaps someone asked them “Should we use Hadoop for this?”, and they didn’t know how to answer. Maybe they had heard of Hadoop and planned to research it, but never found the time to follow up. Out in the Hadoop community at large, some of the answers I have heard to the question “What is Hadoop?” include the comparison of Hadoop to a tape drive, and that the most important thing about Hadoop is that it is an accelerator for Moore’s Law.
Hadoop is a software library for distributed computing (using the map/reduce paradigm). Hence, if you have a distributed computation problem, then Hadoop may be your best solution. There are a wide variety of distributed computing problems; how do you know if Hadoop meets your needs? While there are many possible answers to this question based on a variety of criteria, this post explores the following qualified answer to this question: “If the problem you are trying to solve is amenable to treatment with the Map/Reduce programming model, then you should consider using Hadoop. Our first stop then, is the question, “What is Map Reduce?”. Ok, What is Map/Reduce?
Map/Reduce (or MapReduce, etc.) is a programming strategy/model for distributed computing that was first detailed in a research paper published by Google in 2004. The basic strategy is to assemble a variable number of computing nodes which run jobs in two steps. The structure of the data inputs and outputs to each step, along with the communication restrictions imposed on the steps, enable a simple programming model that is easily distributed. The first step, Map, can be run completely independently on many nodes. A set of input data is divided up into parts (one for each node) which are mapped from the input to some results space. The second step, Reduce, involves aggregating these results from the independent processing jobs into the final result. Classic examples of Map/Reduce style jobs include evaluating large aggregate functions and counting word frequencies in a large text corpus. In the former, data is partitioned and partial sums are calculated (Map), and then globally aggregated (Reduce). In the latter, data is partitioned and words are counted (Map), then summed (Reduce). Map operations take as input a set of key/value pairs and produce as output a list of key/value pairs. The input and output keys and values may (and generally do) differ in type. An important fact to notice is that, at the end of the Map step, there may be values associated with a single output key living on multiple Map compute nodes. In the Reduce step, all values with the same output key are passed to the same reducer (independent of which Map node they came from). Note that this may require moving intermediate results between nodes prior to the Reduce step; however, this is the only inter-node communication step in Map/Reduce. The Reduce step then takes as input a map-output key and a list of values associated with that key, and produces as output a list of resultant values. To clarify, all Map tasks are independent of each other. All Reduce tasks are independent of each other. It is possible to have no-op tasks for either the Map or Reduce phase. What does this have to do with Hadoop?
Hadoop brings together a framework that is specially designed for Map/Reduce computing, from the data storage up. The two foundational components of this framework are the Hadoop Distributed File System and the Hadoop Map/Reduce framework. These are probably the two parts of Hadoop that most people have heard the most about, and most of the rest of the framework is built on top of them. The Hadoop Distributed File System (HDFS)
HDFS is the foundation of much of the performance benefit and simplicity of Hadoop. It is arguably also the component (technically an Apache sub-project) of Hadoop that is most responsible for Hadoop’s overall association with “big data”. HDFS allows for reliable storage of very large files across machines in a cluster comprised of commodity computing nodes. HDFS is a special-purpose file system and as such is not POSIX-compliant. For example, files, once written, cannot be modified. Files in HDFS are stored as a sequence of replicated blocks, all of which are the same size except the last one. The default block size is 64MB, but the block size and replication factor can be set at the file level. HDFS has a master/slave architecture. The master node is called the Namenode, and it manages all of the filesystem metadata (names, permissions, and block locations). The Namenode hosts an interface for file system operations and determines the mapping of blocks to the slaves, called Datanodes. There can be one Datanode per node in the Hadoop cluster, and they manage local data block storage. The Datanodes serve data reads and writes directly, and they create, delete, and replicate blocks in response to commands from the Namenode. When files have been written to HDFS, their blocks are spread over the Datanodes of the Hadoop cluster. Hadoop achieves high-performance parallel computing by taking the computation to the data on the Datanodes. Hadoop Map/Reduce Hadoop achieves high performance for parallel computation by dividing up Map tasks so that individual Map tasks can operate on data that is local to the node on which they are running. In simple terms, this means that the Map task runs on the same server that is running a DataNode that stores the data to be mapped. The overall number of Map tasks is determined by the number of blocks taken up by the input data. A Map/Reduce job generally proceeds as follows:
- Split the input files and prepare input key/values from them.
- Run one Map task per input split unit. There is no communication between Map tasks.
- “Shuffle”: Group the output according to output keys and distribute to nodes that will run Reduce task(s). This is the only communication step.
- Run Reduce tasks.
- Write Output
Like HDFS, Hadoop Map/Reduce has a master/slave architecture. The master node is called the Jobtracker, and the slave nodes are called Tasktrackers. The Jobtracker distributes Map and Reduce tasks to Tasktrackers. Tasktrackers execute the Map and Reduce tasks and handle the communication tasks required to move data around between the Map and Reduce phases.Specifically, for Hadoop Map/Reduce, jobs proceed as follows:
- Files are loaded from HDFS
- An implementation of the Hadoop interface InputFormat determines which files will be read for input. It breaks files into pieces which are implementations of the Hadoop interface InputSplit (typically FileSplit instances). A very large file may be split into several such pieces. A single Map task is created per split.
- Each Map task generates input key-value pairs by reading its FileSplit using the RecordReader instance provided by the InputFormat.
- Each Map task performs computation on the input key-value pairs. There is no communication between Map tasks.
- Each Map task calls OutputCollector.collect to forward its results to the Reduce task(s).
- (Optional) Run a Combiner on the Map node – this is a lot like a Reduce step, but it only runs on the data on one node. It is an optimization.
- Shuffle: outputs from Map tasks are moved to Reducers. There can be user-controlled via a custom Partitioner class.
- Sort: the map results are sorted before passing to the Reduce step.
- The Reduce step is invoked. The reducer recieves a key and an iterator over all map-output values associated with the key.
- Reduce invokes OutputCollector.collect to send final values to the output file.
- A RecordWriter instance outputs the results to files in HDFS.
Note here that the storage architecture of HDFS drives the parallelization of the Map/Reduce job. The alignment of the data distribution and computing distribution enables the work in the Map phase to run locally to the data, which in turn makes Hadoop perform very well for this type of work. Note further that Hadoop Map/Reduce does not require that its input files reside in HDFS, nor is HDFS only usable by Map/Reduce programs. The two combine to produce a powerful and simple mode for parallel computing.Example
Now let’s put it all together with a simple example of a Hadoop MapReduce job. Both the Map and Reduce tasks are shown below:
Those of you familiar with the Hadoop word count example will recognize that this code is similar. This example expects input files that are lists of IP addresses, one per line. It processes all the files in the (specified) input directory in HDFS using FileInputFormat. The Map task parses each line (passed in as input values) into the four numbers of an IPv4 address, and checks to see if the address is in the major IP address block for Vatican City. If the address is in the Vatican City IP block, the Mapper outputs the IP address as the output key and 1 as the output value. The Reducer receives all of the Map output values for a given IP address in the Vatican City IP block and sums up the counts for each.
HDFS and Hadoop Map/Reduce combine in a supportive architecture to form a foundation for high-performance parallel computing on clusters of commodity machines. If you are trying to decide whether or not to use Hadoop for a project, asking yourself “Can we use Map/Reduce to attack this problem” is a fruitful line of inquiry that can provide a compelling and defensible answer.
Five elephants come across a blind man. The first elephant touches the man and declares, “Men are very flat”. All the other elephants agree.