Implementing MapReduce with Vanilla NodeJS (I)
Recently I started to look at Handoop MapReduce framework (originally published by Google) . This along with GFS and Bigtable have empowered Google from its early days until today.
Unfortunately, at my workplace, we are not actively using this. However, I tried to do a small project on it using vanilla NodeJS. While admittedly it is nowhere near perfect from the real framework. But it definitely helped me understanding the framework a bit better.
This blog is split into two parts, the first is about MapReduce in theory, the second is about the NodeJS project I did and some reflection on that.
What is MapReduce in brief?
MapReduce was introduced first to deal with analysing big data sets in a parallel, distributed manner.
Let’s say there’re hundreds of thousands of data sets containing the webpages crawled from the Internet stored on a distributed file system, and now we need to process them to generate inverted index for our search engine.
How do we do it? Instead of take the data out and process it at the client side, it is more efficient to bring the processing function closer to the data. In Hadoop, after it splits files into large blocks and distributes them across nodes inside a cluster, it then transfers packaged code (our processing function) into nodes to process the files in parallel.
This is the “Mapping” part.
Since MapReduce implements in a “master-slave” mode, a coordinator will oversee the whole process and deal with errors. The results of the map process will stay in the local disk.
The next step is the “Reducing” part. This is when the processed data gets aggregated. Again, the coordinator assigns the reducers with the output from the mappers’ local disk and lets the reducers apply aggregation function on it (or a reduce function).
And finally, the aggregated result will be sent to client.
Now let’s dive deeper.
A Mapper maps input key-value pairs to one, multiple or zero intermediate key-value pairs. It can transform the type of the input. The intermediate outputs are stored in a (key-length, key, value-length, value) format.
How many mappers are enough? It depends, but normally it’s driven by the total number of the inputs. In some applications, I seen that once a mapper triggered by the coordinator, it will trigger 2 other mappers, and so on and so forth until all inputs are distributed (a tree-like object).
Partition, Shuffle and Sort
What I found with a lot of intro of MapReduce online is that they illustrate Map and Reduce really well, but there’s always some grey area regarding what is happening between the Map and the Reduce. This is kind of understandable, as this part is normally handled by the system itself. However it is important to know at least the basics of it.
So the first is Partitioning, this part decides which reducer gets what (key-value) pairs from the output of the mapper. The default partitioning algo uses a hash function to calculate this based on the keys of the output, e.g.
reducer_id = hash(key)%num_of_reducers
- Caveat: a reducer can get one or multiple key-values, but it will get at least one. However big that output is.
Then it’s Shuffling. This is the process of transferring data from the mappers to the reducers.
- Caveat: Shuffling does not need to wait until all mappers complete, it can start whenever a mapper finishes its part.
Then it is Sorting. I found this part quite confusing before I read this this paragraph from Google’s paper:
We guarantee that within a given partition, the intermediate key/value pairs are processed in increasing key order. This ordering guarantee makes it easy to generate a sorted output file per partition, which is useful when the output file format needs to support efficient random access lookups by key, or users of the output find it convenient to have the data sorted.
So basically sorting is to make the reducer work more efficient by sorting the list of key-values it gets before hand the data to the reducer. Let’s say we have below data as the output of the mapper and 1 reducer.
'iste' => 3,
'sit' => 3,
'error' => 3,
'iste' => 1,
'sit' => 1,
After sort, it will be
'error' => ,
'iste' => [3,1]
'sit' => [3,1],
The reducer will start a new task whenever it sees that the next key in the sorted input is different than the previous one.
Note that shuffling and sorting happens simultaneously.
The reducer will aggregates the key-values by calling the client provided reduce() function for each key once.
When I read it the first time, my first question is: what if the output from the Mapper is too big for the Reducer? How can it read such a big chunk into its memory and process it?
And it turns out that the reducer will get the Key and an Iterable of the Values, and will only read the values from disk into memory while processing it.
- It’s totally fine to have no reducer at all. In this case, there will also be no shuffling/partitioning process. The output of the mapper will be returned to the client.
- Even it is probably not in production, but in theory, one node can be both mapper and reducer.
- The output of reduce is not sorted, and will be written back to HDFS.
And a few extra bits:
A combiner is an optional step after mapper and before partitioner. It is used on the each mapper node if set, and the main usage is to combine the results of mapper function for each key into a single value.
Let’s say we have below output after the mapping
'iste' => 3,
'sit' => 3,
'error' => 3,
'iste' => 1,
'sit' => 1,
When combiner is applied, the input into the partitioner is :
'error' => 3,
'iste' => 4,
'sit' => 4,
Note that the values for each key is aggregated. Does this look like a reducer?
It exactly is! Actually, if your reducer function is both commutative and associative, you can use it as a combiner.
Commutative: the factors in an operation can be reordered without affecting the outcome of the operation: a + b = b + a or a x b = b x a.
Associative : grouping of factors in an operation can be changed without affecting the outcome of the operation. a + (b + c) = (a + b) + c.
In Hadoop, a JobTracker node acts as the Master in MapReduce and is responsible for coordinating Tasks on slave nodes.
A TaskTracker node acts as the Slave and is responsible for executing a Task assigned to it by the JobTracker.
In order to process data as efficiently as possible, MapReduce tries to place the data and processing function as close as possible.
MapReduce works in a master-slave mode. As stated above, the JobTracker acts as the master and TaskTrackers act as the slaves.
MapReduce processes the data in the Key-Value pairs. As a result, it requires the client request can be fit into the frame. (For more complex data model, there are other frameworks like Spark).
MapReduce, due to the complexity of its coordinator process, shows little or worse performance when only computing a small sets of data. But once the master node gets up to speed, it is an unstoppable beast.
MapReduce JobTracker will restart the task on Map/Reduce phase if it detects the worker node is down or fails to communicate.
If it’s the mapping phase, then it will assign the task to other mappers to re-execute.
If it’s the reducing phase, then it will assign the task to other reducers to re-execute.
Note that this requires the whole execution to be side-effect free. It is much easier for the JobTracker to reassign the task if it knows there’s no node-to-node communications between the worker nodes, otherwise, it has to take the other nodes’ states into consideration.
What if some nodes are slower than others, does the JobTracker needs to wait for these before progress to the next phase? Fortunately not, the coordinator will schedule these tasks from the slow nodes again, to assign to other nodes once most of the tasks are done.This is called speculative execution. Whichever node completes its task first will notify the coordinator, who will stop other nodes that are doing the same task.
MapReduce can be chained like to allow complex job breaking down into separate stages of tasks.
Map1 -> Reduce1 -> Map2 -> Reduce2 -> Map3 --> Reduce3...
Finally, some resources I found most useful:
Ad Hoc Big Data Processing Made Simple with Serverless MapReduce | Amazon Web Services
September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details. Sunil…
This section provides a reasonable amount of detail on every user-facing aspect of the MapReduce framework. This should…
MapReduce: Simplified Data Processing on Large Clusters - Google Research
MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users…