Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
by Marek Gagolewski, Maciej Bartoszuk, Anna Cena, and Jan Lasek (Rexamine).
Introduction
In a recent blog post we explained how we managed to set up a working Hadoop environment on a few CentOS7 machines. To test the installation, let’s play with a simple example.
Hadoop Streaming API allows to run Map/Reduce jobs with any programs as the mapper and/or the reducer.
Files are processed line-by-line. Mappers get appropriate chunks of the input file. Each line is assume to store information on key-value pairs. By default, the following form is used:
key1 t val1 n key2 t val2 n
If there is no TAB character, then the value is assumed to be NULL
.
In fact this is a hadoop version of a program that rearranges lines in the input file so that duplicated lines appear one after another – the output is always sorted by key.
This is because:
hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -input /input/test.txt -output /output -mapper /bin/cat -reducer /bin/cat hdfs dfs -cat /output/part-00000
This is roughly equivalent to:
cat input | mapper | sort | reducer > output
More specifically, in our case that was:
cat input | cat | sort | cat > output
A sample Map/Reduce job
Let’s run a simple Map/Reduce job written in R and C++ (just for fun – we assume that all the nodes run the same operating system and they use the same CPU architecture).
- As we are in the CentOS 7 environment, we will need a newer version of R on all the nodes.
$ su # yum install readline-devel # cd # wget http://cran.rstudio.com/src/base/R-3.1.2.tar.gz # tar -zxf R-3.1.2.tar.gz # cd R-3.1.2 # /configure --with-x=no --with-recommended-packages=no # make # make install # R R> install.packages('stringi') R> q()
- Edit
yarn-site.xml
(on all nodes):
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
Without that, Hadoop may complain about too huge virtual memory memory consumption by R.
- Create script
wc_mapper.R
:
#!/usr/bin/env Rscript library('stringi') stdin <- file('stdin', open='r') while(length(x <- readLines(con=stdin, n=1024L))>0) { x <- unlist(stri_extract_all_words(x)) xt <- table(x) words <- names(xt) counts <- as.integer(xt) cat(stri_paste(words, counts, sep='t'), sep='n') }
- Create a source file
wc_reducer.cpp
:
#include <iostream> #include <string> #include <cstdlib> using namespace std; int main() { string line; string last_word = ""; int last_count = 0; while(getline(cin,line)) { size_t found = line.find_first_of("t"); if(found != string::npos) { string key = line.substr(0,found); string value = line.substr(found); int valuei = atoi(value.c_str()); //cerr << "key=" << key << " value=" << value <<endl; if(key != last_word) { if(last_word != "") cout << last_word << "t" << last_count << endl; last_word = key; last_count = valuei; } else last_count += valuei; } } if(last_word != "") cout << last_word << "t" << last_count << endl; return 0; }
Now it’s time to compile the above C++ source file:
$ g++ -O3 wc_reducer.cpp -o wc_reducer
- Let’s submit a map/reduce job via the Hadoop Streaming API
$ chmod 755 wc_mapper.R $ hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -input /input/test.txt -output /output -mapper wc_mapper.R -reducer wc_reducer -file wc_mapper.R -file wc_reducer
By the way, Fedora 20 RPM Hadoop distribution provides Hadoop Streaming API jar file under /usr/share/hadoop/mapreduce/hadoop-streaming.jar
.
Summary
In this tutorial we showed how to submit a simple Map/Reduce job via the Hadoop Streaming API. Interestingly, we used an R script as the mapper and a C++ program as the reducer. In an upcoming blog post we’ll explain how to run a job using the rmr2
package.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.