Site icon R-bloggers

Using Hadoop Streaming API to perform a word count job in R and C++

[This article was first published on Rexamine » Blog/R-bloggers, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

by Marek Gagolewski, Maciej Bartoszuk, Anna Cena, and Jan Lasek (Rexamine).

Introduction

In a recent blog post we explained how we managed to set up a working Hadoop environment on a few CentOS7 machines. To test the installation, let’s play with a simple example.

Hadoop Streaming API allows to run Map/Reduce jobs with any programs as the mapper and/or the reducer.

Files are processed line-by-line. Mappers get appropriate chunks of the input file. Each line is assume to store information on key-value pairs. By default, the following form is used:

key1 t val1 n
key2 t val2 n

If there is no TAB character, then the value is assumed to be NULL.

In fact this is a hadoop version of a program that rearranges lines in the input file so that duplicated lines appear one after another – the output is always sorted by key.

This is because:

hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar 
   -input /input/test.txt 
   -output /output
   -mapper /bin/cat
   -reducer /bin/cat
hdfs dfs -cat /output/part-00000

This is roughly equivalent to:

cat input | mapper | sort | reducer > output

More specifically, in our case that was:

cat input | cat | sort | cat > output

A sample Map/Reduce job

Let’s run a simple Map/Reduce job written in R and C++ (just for fun – we assume that all the nodes run the same operating system and they use the same CPU architecture).

  1. As we are in the CentOS 7 environment, we will need a newer version of R on all the nodes.
$ su
# yum install readline-devel
# cd
# wget http://cran.rstudio.com/src/base/R-3.1.2.tar.gz
# tar -zxf R-3.1.2.tar.gz
# cd R-3.1.2
# /configure --with-x=no --with-recommended-packages=no
# make
# make install
# R
R> install.packages('stringi')
R> q()
  1. Edit yarn-site.xml (on all nodes):
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

Without that, Hadoop may complain about too huge virtual memory memory consumption by R.

  1. Create script wc_mapper.R:
#!/usr/bin/env Rscript

library('stringi')
stdin <- file('stdin', open='r')

while(length(x <- readLines(con=stdin, n=1024L))>0) {
   x <- unlist(stri_extract_all_words(x))
   xt <- table(x)
   words <- names(xt)
   counts <- as.integer(xt)
   cat(stri_paste(words, counts, sep='t'), sep='n')
}
  1. Create a source file wc_reducer.cpp:
#include <iostream>
#include <string>
#include <cstdlib>

using namespace std;

int main()
{
  string line;
  string last_word = "";
  int last_count = 0;

  while(getline(cin,line))
  {
    size_t found = line.find_first_of("t");
    if(found != string::npos)
    {
      string key = line.substr(0,found);
      string value = line.substr(found);
      int valuei = atoi(value.c_str());
      //cerr << "key=" << key << " value=" << value <<endl;
      if(key != last_word)
      {
              if(last_word != "") cout << last_word << "t" << last_count << endl;

              last_word = key;
              last_count = valuei;
      }
      else
              last_count += valuei;
    }
  }
  if(last_word != "") cout << last_word << "t" << last_count << endl;


  return 0;
}

Now it’s time to compile the above C++ source file:

$ g++ -O3 wc_reducer.cpp -o wc_reducer
  1. Let’s submit a map/reduce job via the Hadoop Streaming API
$ chmod 755 wc_mapper.R
$ hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar 
   -input /input/test.txt 
   -output /output
   -mapper wc_mapper.R
   -reducer wc_reducer
   -file wc_mapper.R
   -file wc_reducer

By the way, Fedora 20 RPM Hadoop distribution provides Hadoop Streaming API jar file under /usr/share/hadoop/mapreduce/hadoop-streaming.jar.

Summary

In this tutorial we showed how to submit a simple Map/Reduce job via the Hadoop Streaming API. Interestingly, we used an R script as the mapper and a C++ program as the reducer. In an upcoming blog post we’ll explain how to run a job using the rmr2 package.

To leave a comment for the author, please follow the link and comment on their blog: Rexamine » Blog/R-bloggers.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.