Implementing MapReduce with Vanilla NodeJS (II)

E.Y.
4 min readNov 27, 2021

--

Photo by Lindsay Moe on Unsplash

Recently I started to look at Handoop MapReduce framework (originally published by Google) . This along with GFS and Bigtable have empowered Google from its early days until today.

This blog is the second one of the series where the first one is about the MapReduce in theory.

In this blog, I’m going to walk you through a vanilla NodeJS MapReduce framework using child processes.

First, let’s define our interface as:

task_mgr(input, map, reduce, (res) => { 
console.log(res);
});

So we need client dataset, a mapper function , a reducer function, and finally we return the result by simply console logging it.

The data (using Lorem Ipsum):

./data.jsmodule.exports = ` Sed ut perspiciatis unde omnis iste natus error sit voluptatem Sed ut perspiciatis unde omnis iste natus error sit voluptatem Sed ut perspiciatis unde omnis iste natus error sit voluptatem dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam,`;

Mapper and Reducer — we are aiming to do a word split example.

const map = function (input) {
const acc = new Map();
input.forEach(function (value) {
if (acc.has(value)) {
acc.set(value, acc.get(value) + 1);
} else {
acc.set(value, 1);
}
});
return [...acc];
};

const reduce = function (key, values) {
let sum = 0;
values.forEach(function (e) {
sum += e;
});
return sum;
};

The task manager (aka. the JobTracker) exists in a separate js file, where we will init the worker nodes.

We sprawn the number of workers based on the number of cores on the PC (for me, it’s a 8 core Mac, so I have 8 workers).

const cluster = require(“cluster”);const default_cores = require(“os”).cpus().length;

We initiate the child processes and control them using the Master:

const task_mgr = function (input, map, reduce, callback) {

if (cluster.isMaster) {
for (let i = 0; i < cores; i++) {
var worker = cluster.fork();
// DO COORDINATION WORK
}
} else if (cluster.isWorker) {
//DO MAP & REDUCE
}
};

We will be reusing the worker to do both the mapper and reducer function.

Next, we will split the job into tasks before we feed them into the mappers.

const split_chunk = function (input) {
input = input.split(" ");
const chunk_size = Math.ceil(input.length / cores);
const input_list = [];
for (let i = 0; i < input.length; i += chunk_size) {
const upper_bound = Math.min(i + chunk_size, input.length);
const batch = input.slice(i, upper_bound);
input_list.push(batch);
}
return input_list;
};

We then let the Master to send the task to each worker:

  const tasks = split_chunk(input);

if (cluster.isMaster) {
for (let i = 0; i < cores; i++) {
var worker = cluster.fork();
const task = tasks[worker.id - 1];
worker.send({ map_task: task });

The worker will listen on the message event, and if it’s a Mapper action, it will do what is required using the client defined mapper function, and return the intermediate data back to the parent process.

else if (cluster.isWorker) {
process.on("message", (msg) => {
if (msg["map_task"]) {
const mapped = map(msg["map_task"]);
process.send({
from: cluster.worker.id,
signal: "MapCompleted",
mapped,
});
}

The parent will count the number of tasks returned and collect the results, and when it matches with the number of tasks dispatched, it will proceed to the next phase:

 
worker.on("message", (msg) => {
if (msg.signal === "MapCompleted") {
post_mapper = post_mapper.concat(msg.mapped);
map_count++;

if (map_count === cores) {
// e.g. { 'a': [ 1 ], 'b': [ 1 ], "c": [ 1, 1, 1 ] }
// NEXT PHASE
}

Note that in real MapReduce, the reduce stage will not wait all mappers are done. But for easy of coordination, we will only proceed when we collect all results from mapping stage.

The next step is partitioning, shuffling, and sorting:

function* convert_hashmap_to_array(obj) {
for (let key in obj) yield [key, obj[key]];
}
const shuffle = function (post_mapper) {
post_mapper.sort(); // Sorting while shuffling
const key_values = post_mapper.reduce(function (acc, cur) {
let key = acc[cur[0]];
if (!key || typeof key != "object") key = [];

key.push(cur[1]);
acc[cur[0]] = key;
return acc;
}, {});
const key_values_array = Array.from(convert_hashmap_to_array(key_values));
return key_values_array;
};

const partition = function (key_values_array) {
const key_len = key_values_array.length;
const reducer_size = Math.ceil(key_len / cores);
const reducer_tasks = [];
for (let i = 0; i < key_len; i += reducer_size) {
const upper_bound = Math.min(i + reducer_size, key_len);
const batch = key_values_array.slice(i, upper_bound);
reducer_tasks.push(batch);
}
return reducer_tasks;
};

We will again assign these reduce tasks to corresponding reducers.

   
if (map_count === cores) {
const key_values = shuffle(post_mapper);
const reducer_tasks = partition(key_values);
reducer_task_size = reducer_tasks.length;
for (const task of reducer_tasks) {
worker.send({ reduce_task: task });
}
}
}

And our reducers, previously mappers, will again diligently do what the client requests:

    if (msg["reduce_task"]) {
let reduced = {};
msg["reduce_task"].map((el) => {
reduced[el[0]] = reduce(el[0], el[1]); // the client reduce function, passing in key and a list of values
});

process.send({
from: cluster.worker.id,
signal: "ReduceCompleted",
reduced,
});
cluster.worker.kill();
}
});

Some caveat, in real MapReduce, we will be passing in a Key and an Iterable object, instead of the whole chunk of value list.

Again, the Master node will listen on the message passed from the reducers, and count the number of tasks been done, and return the results to client by calling the callback function, in our case, it’s just console the result.

if (msg.signal === "ReduceCompleted") {
post_reducer = post_reducer.concat(Object.entries(msg.reduced));
reduce_count++;

if (reduce_count === reducer_task_size) {
callback(new Map(post_reducer));
}
}

The final processed result using the data at the beginning of the blog looks like this:

Map(74) {
'eos' => 1,
'error' => 3,
'est' => 3,
'et' => 4,
'omnis' => 3,
'perspiciatis' => 3,
'praesentium' => 1,
'provident,\nsimilique' => 1,
'sit' => 3,
}

That’s it!

Obviously, there’s a lot of details been left out. But I found this to be an interesting experiment on MapReduce, and I surely early look forward to using more of it in distributed world!

Happy Reading!

--

--

No responses yet