Site icon R-bloggers

From functional programming to MapReduce in R

[This article was first published on Cartesian Faith » R, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

The MapReduce paradigm has long been a staple of big data computational strategies. However, properly leveraging MapReduce can be a challenge, even for experienced R users. To get the most out of MapReduce, it is helpful to understand its relationship to functional programming. In this post I discuss how MapReduce relates to the underlying higher order functions map and reduce. By the end of the post, you will be able to translate R code into corresponding MapReduce jobs. You’ll also be able to understand how parallelism and apparent randomness fit into the paradigm so it is easier to reason about the result of a computation.

Preliminaries

In the R language, processing data via MapReduce is accomplished by the rmr2 and rhdfs packages. These form part of the RHadoop ecosystem, which is thankfully much smaller and more straightforward than the Hadoop ecosystem. If you are on a Debian system, you can install these tools along with Hadoop and friends by running my RHadoop installation script.

For this post, we will keep things simple. Hence, the running example will be calculating the word count of a document, which is the “Hello, World” of big data. One use of a document word count is to look for common words or phrases in SEC filings. This example uses a text export of the latest Amazon 10-Q form.

Higher Order Functions

A higher order function is simply a function that either accepts functions as arguments or returns a function. For example, in mathematics, the derivative is a higher order function. Three higher order functions appear in virtually every functional programming language and can be considered a de facto canon. These are map, fold (or reduce), and filter. These functions abstract iteration over lists, which is ubiquitous in a vectorized language like R. A map echoes the mathematical sense of the word, as a procedure that transforms every element of a set by the same function. The output of map is typically one-to-one with the input. Fold, on the other hand, is a mechanism to aggregate a set of data into a single value. The summation and product operators are good examples of this, where a vector is collapsed into a single value. The final function returns a subset of a list, based on some predicate. Some set operations (e.g. set difference) are examples of a filter operation.

In R, these operations are ubiquitous. Yet, most people don’t realize that R has such a rich functional core. One of the reasons is that in base R, these operations are hidden in plain sight. Vectorized operations can typically be traced to one of these three operations. For example, map operations are native for binary vector operations (e.g. x + y). Where native vectorization cannot be used, the apply family of functions are synonomous to map. Fold operations are typically implemented as primitive functions (e.g. sum, prod, var). To create your own, the base package contains implementations called Map, Reduce, and Filter. My package lambda.tools also contains implementations of these functions. The difference is that they are more compact and have extended semantics consistent with R idioms.

Now that we have this foundation, let’s look at the word count example. The goal is to simply count the number of occurrences for each word appearing in a document. Our document is a text file containing an arbitrary number of characters per line. We’ll read it in using lines <- readLines("amazon-10q.txt"). Prior to counting words, we need to clean up the document to make it easier to count. The clean function simply forces everything to lower case and removes a bunch of extraneous characters.

clean <- function(x) {
  gsub('[,.;:'"()]','',tolower(x))
}

Our first approach will make use of built-in functions to illustrate how simple this task is. The first line splits each element of (the cleaned) lines on white space. The output of strsplit is a list, where each element contains a character vector. We can collapse that into a single vector via do.call. Finally, we use table to perform the counts for us.

word_count_simple <- function(lines) {
  chunks <- strsplit(clean(lines),'\s')
  words <- do.call(c, chunks)
  table(words)
}

The output looks something like

> x <- word_count_simple(lines)
> tail(x,10)
words
   wwwamazonin              x           xbrl           year year-over-year 
             1              5              1             34              6 
         years         years’            yen            yes           yuan 
            18              1              3              3              3

MapReduce

The benefit of MapReduce is to be able to process massive datasets that cannot fit in main memory. In these cases it’s necessary to partition the data and distribute the work over many machines. There are countless ways of doing this, of which MapReduce is one way. Due to the overhead of running a cluster, MapReduce is really only practical when data is very large. Otherwise, the time spent negotiating communication protocols and passing data around the network outweighs the time spent processing data. Hence, for our toy project, the runtime will be significantly slower than the native version above.

Caveat aside, what is the process for running MapReduce jobs in R? First, you need to get data into the distributed file system. Next, you execute the MapReduce job, and then massage the output according to your needs. Here is the same word count operation using RHadoop.

word_count_hadoop <- function(lines) { 
  words <- to.dfs(do.call(c, strsplit(clean(lines),'\s')))
  fs.ptr <- mapreduce(input=words,
    map=function(k,v) keyval(v,1),
    reduce=function(k,v) keyval(k, sum(v)))
  raw <- from.dfs(fs.ptr)
  out <- raw$val
  names(out) <- raw$key
  out[order(names(out))]
} 

The output is the same:

> x3 <- word_count_hadoop(lines)
> tail(x3,10)
   wwwamazonin              x           xbrl           year year-over-year 
             1              5              1             34              6 
         years         years’            yen            yes           yuan 
            18              1              3              3              3

While the output matches, the code looks nothing alike! How do we go from a simple R implementation to the MapReduce implementation? While the MapReduce paradigm does use the map and reduce higher order functions, code cannot be directly translated from one to the other. The reason is that MapReduce partitions data during the computation. It uses specific data structures for each stage of operation to ensure that data can be properly reassembled. Both map and reduce stages are partitioned depending on the number of nodes available and the size of the data. In HDFS, data are stored in 64 MB chunks. In the Java world, the result set of key-value pairs is independent of the input set. With rmr2, I’ve found that the cardinalities must match with a one-to-one correspondence, otherwise there is an error.

MapReduce jobs use keys to sort and group results. The output of a stage must preserve the cardinality of the set. Hence, each incoming key needs a corresponding output key. The initial map stage typically has no keys, so your result needs to provide a key. The easiest way is by wrapping the result in a keyval call. After the map stage is done, there is an intermediate step that collects and groups all the values by key. Hence, each value is actually a list of values. This set of key-value pairs is then partitioned according to the number of reduce nodes available. Finally, all the data are collected and assembled into a final data structure.

In our code above, the map stage is straightforward. Every input value is a word, and we are returning an encoded version, where the word acts as a key and the value is merely a count of 1. In the sorting stage, all the keys are grouped together and sent to the same reduce node. Hence, for the word “yuan”, the reduce function will be called function("yuan", c(1,1,1)). Indeed, each call to the reduce function will have a vector of ones as the value (since that is the only value we emitted in the map stage). The responsibility of the reduce function is simply to sum up all these ones to get a final count. It’s important to remember that the reduce operation is being applied to each key-value object. Values across keys cannot be accessed.

Parallelism and Randomness

Constructing computations in this manner are complex. However, the value of MapReduce is its structured and consistent approach to partitioning and processing data. For a massive dataset, the details of the distribution are completely transparent to the end user. The code and result are the same whether there are 5 nodes or 5000 nodes. To guarantee transparent scaling, certain constraints must be placed on the structure of the code. One such constraint is that key-value pairs must be self-contained and independent of each other. This allows the system to partition the data however it needs to without affecting the calculation. To the end user, these partitions can appear random.

So how are values across keys related? The key *ahem* is in the sorting and grouping stage. A vector is created that comprises all values associated with a single key. The implication is that the set of keys emitted from the map phase dictates what the reduce function sees.

To get a better understanding of how MapReduce works, below is an in-memory, native implementation. In both the map and reduce “stages”, the number of chunks are calculated based on the supplied parameters. The map operation applies the code in the initial word_count_simple to each chunk. The result gets collapsed into a single list, as it makes it convenient for the next step.

During the sort step, all the values are aggregated by key. Here I just use a list for simplicity’s sake, despite its performance on character indices. The map operation here appends each value to a vector. I take advantage of the fact that NULL is the identity for concatenation to avoid a conditional statement.

In the reduce step, the sum (which we know is a fold operation) is applied to each input vector. These come from bigger chunks, one for each “reduce” node. Finally, the result from each chunk is collapsed into a single vector and sorted.

word_count_mr <- function(lines, mjobs=10, rjobs=2) {
  # Map stage
  mlength <- ceiling(length(lines) / mjobs)
  mchunks <- lapply(1:mjobs, 
    function(i) lines[(mlength * (i-1) + 1) : (mlength * i)])
  counts <- do.call(c, lapply(mchunks,
    function(x) table(do.call(c, strsplit(clean(x),'\s')))))

  # Sort
  sorted <- list()
  lapply(1:length(counts), function(i) {
    key <- names(counts)[i]
    if (nchar(key) == 0) return()
    sorted[[key]] <<- c(sorted[[key]], counts[i])
  })

  # Reduce stage
  rlength <- ceiling(length(sorted) / rjobs)
  rchunks <- lapply(1:rjobs, function(i) {
    o <- sorted[(rlength * (i-1) + 1) : (rlength * i)]
    o[!is.na(names(o))]
  })
  rcounts <- lapply(rchunks, function(x) sapply(x,sum))
  out <- do.call(c, rcounts)
  out[order(names(out))]
}

The result matches the other implementations.

> x2 <- word_count_mr(lines)
> tail(x2,10)
   wwwamazonin              x           xbrl           year year-over-year 
             1              5              1             34              6 
         years         years’            yen            yes           yuan 
            18              1              3              3              3

Not only does it match the other implementations, but it continues to yield the same result, even if we change the number of “nodes”. This transparent scaling is one of the true benefits of the MapReduce system.

> x3 <- word_count_mr(lines,40,15)
> tail(x3,10)
   wwwamazonin              x           xbrl           year year-over-year 
             1              5              1             34              6 
         years         years’            yen            yes           yuan 
            18              1              3              3              3

Conclusion

In this post, I’ve demonstrated the value of the MapReduce paradigm and how to translate native R code into the structure dictated by MapReduce. For those interested in learning more about functional programming and how it relates to R are welcome to download a draft of my book, “Modeling Data With Functional Programming In R“, which among other things discusses in detail the significance of higher order functions in R and how to leverage them effectively.


To leave a comment for the author, please follow the link and comment on their blog: Cartesian Faith » R.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.