Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
by Marek Gagolewski, Maciej Bartoszuk, Anna Cena, and Jan Lasek (Rexamine).
Introduction
In a recent blog post we explained how we managed to set up a working Hadoop environment on a few CentOS7 machines. To test the installation, let’s play with a simple example.
Hadoop Streaming API allows to run Map/Reduce jobs with any programs as the mapper and/or the reducer.
Files are processed line-by-line. Mappers get appropriate chunks of the input file. Each line is assume to store information on key-value pairs. By default, the following form is used:
key1 t val1 n key2 t val2 n
If there is no TAB character, then the value is assumed to be NULL.
In fact this is a hadoop version of a program that rearranges lines in the input file so that duplicated lines appear one after another – the output is always sorted by key.
This is because:
hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -input /input/test.txt -output /output -mapper /bin/cat -reducer /bin/cat hdfs dfs -cat /output/part-00000
This is roughly equivalent to:
cat input | mapper | sort | reducer > output
More specifically, in our case that was:
cat input | cat | sort | cat > output
A sample Map/Reduce job
Let’s run a simple Map/Reduce job written in R and C++ (just for fun – we assume that all the nodes run the same operating system and they use the same CPU architecture).
- As we are in the CentOS 7 environment, we will need a newer version of R on all the nodes.
$ su
# yum install readline-devel
# cd
# wget http://cran.rstudio.com/src/base/R-3.1.2.tar.gz
# tar -zxf R-3.1.2.tar.gz
# cd R-3.1.2
# /configure --with-x=no --with-recommended-packages=no
# make
# make install
# R
R> install.packages('stringi')
R> q()
- Edit
yarn-site.xml(on all nodes):
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
Without that, Hadoop may complain about too huge virtual memory memory consumption by R.
- Create script
wc_mapper.R:
#!/usr/bin/env Rscript
library('stringi')
stdin <- file('stdin', open='r')
while(length(x <- readLines(con=stdin, n=1024L))>0) {
x <- unlist(stri_extract_all_words(x))
xt <- table(x)
words <- names(xt)
counts <- as.integer(xt)
cat(stri_paste(words, counts, sep='t'), sep='n')
}
- Create a source file
wc_reducer.cpp:
#include <iostream>
#include <string>
#include <cstdlib>
using namespace std;
int main()
{
string line;
string last_word = "";
int last_count = 0;
while(getline(cin,line))
{
size_t found = line.find_first_of("t");
if(found != string::npos)
{
string key = line.substr(0,found);
string value = line.substr(found);
int valuei = atoi(value.c_str());
//cerr << "key=" << key << " value=" << value <<endl;
if(key != last_word)
{
if(last_word != "") cout << last_word << "t" << last_count << endl;
last_word = key;
last_count = valuei;
}
else
last_count += valuei;
}
}
if(last_word != "") cout << last_word << "t" << last_count << endl;
return 0;
}
Now it’s time to compile the above C++ source file:
$ g++ -O3 wc_reducer.cpp -o wc_reducer
- Let’s submit a map/reduce job via the Hadoop Streaming API
$ chmod 755 wc_mapper.R $ hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -input /input/test.txt -output /output -mapper wc_mapper.R -reducer wc_reducer -file wc_mapper.R -file wc_reducer
By the way, Fedora 20 RPM Hadoop distribution provides Hadoop Streaming API jar file under /usr/share/hadoop/mapreduce/hadoop-streaming.jar.
Summary
In this tutorial we showed how to submit a simple Map/Reduce job via the Hadoop Streaming API. Interestingly, we used an R script as the mapper and a C++ program as the reducer. In an upcoming blog post we’ll explain how to run a job using the rmr2 package.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
