Site icon R-bloggers

Implementing mclapply() on Windows: a primer on embarrassingly parallel computation on multicore systems with R

[This article was first published on Nathan VanHoudnos » rstats, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

An easy way to run R code in parallel on a multicore system is with the mclapply() function. Unfortunately, mclapply() does not work on Windows machines because the mclapply() implementation relies on forking and Windows does not support forking.

For me, this is somewhat of a headache because I am used to using mclapply(), and yet I need to support Windows users for one of my projects.

My hackish solution is to implement a fake mclapply() for Windows users with one of the Windows compatible parallel R strategies. For the impatient, it works like this:

require(parallel) 

## On Windows, the following line will take about 40 seconds to run
## because by default, mclapply is implemented as a serial function
## on Windows systems.
system.time( mclapply(1:4, function(xx){ Sys.sleep(10) }) )
##    user  system elapsed 
##    0.00    0.00   40.06 

## Using the ideas developed in this post, we can implement
## a parallel (as it should be!) mclapply() on Windows. 
source("http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R")
## 
##     *** Microsoft Windows detected ***
##     
##     For technical reasons, the MS Windows version of mclapply()
##     is implemented as a serial function instead of a parallel
##     function.    
## 
##     As a quick hack, we replace this serial version of mclapply()
##     with a wrapper to parLapply() for this R session. Please see
## 
##       http://www.stat.cmu.edu/~nmv/2014/07/14/implementing-mclapply-on-windows 
## 
##     for details.

## And now the code from above will take about 10 seconds (plus overhead). 
system.time( mclapply(1:4, function(xx){ Sys.sleep(10) }) )
##    user  system elapsed 
##    0.01    0.06   11.25 

As we will see, however, there are a few reasons why no one has done this in the past.

Our goal: easy, Linux/Mac-like parallelization

On Linux or Mac, it is is very simple to parallelize R code across multiple cores. Consider the following function

wait.then.square <- function(xx){
    # Wait for one second
    Sys.sleep(1);
    # Square the argument 
    xx^2 } 

If we want to run it on the integers from 1 to 4 in serial, it will take about 4 seconds

## Run in serial 
system.time( serial.output <- lapply( 1:4, wait.then.square ) )
##  user  system elapsed 
## 0.000   0.000   4.004 

If we run it in parallel, it will take about 1 second:

## Run in parallel
require(parallel) 
## Note two changes: 
##   (1)  lapply to mclapply 
##   (2)  mc.cores (the number of processors to use in parallel) 
system.time( par.output <- mclapply( 1:4, wait.then.square,
                                     mc.cores=4             ) )
##  user  system elapsed 
## 0.008   0.000   1.008 

And we can verify that the output is, in fact, the same:

## Check if the output is the same
all.equal( serial.output, par.output )
## [1] TRUE

This toy example is a little unrealistic. It is often the case, at least for the work that I do, that the parallelized function either (i) uses an R library that isn't loaded at startup by deafault, e.g. the Matrix library for sparse matrices, or (ii) needs to access an object in the global environment, e.g. a variable.

The magic of mclapply() is that uses fork to replicate the R process into several child processes, tells the children to do the work, and then aggregates the children's results for you. Since it uses forking, the entire R session -- all of its variables, functions, and packages -- is replicated among the children. Therefore, you can do things like this:

## Setup a global variable that uses a non-base package
require(Matrix)
( a.global.variable <- Diagonal(3) )
## 3 x 3 diagonal matrix of class "ddiMatrix"
##      [,1] [,2] [,3]
## [1,]    1    .    .
## [2,]    .    1    .
## [3,]    .    .    1

## Write a proof-of-concept lapply
serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })

## Parallelize it
par.output <- mclapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    }, mc.cores=4)

## Check that they are equal 
all.equal(serial.output, par.output)
## [1] TRUE

It is, at least to me, a little magical! I don't have to think much.

The problem: Windows parallelization requires more setup

Windows doesn't fork. It is a limitation of the operating system that there is no easy way to replicate the parent R session to create new child R sessions that can do the work.

R gets around this by pretending that each core on the machine is an entirely separate machine. This makes the setup a little more involved because the user must:

  1. create a "cluster" of child processes,
  2. load the necessary R packages on the cluster,
  3. copy the necessary R objects to the cluster,
  4. distribute work to the cluster, and finally
  5. stop the cluster.

Recall that the setup of the example is as follows:

## Load packages 
   require(parallel)
   require(Matrix)
##
## Define example function and the global variable 
   wait.then.square <- function(xx){
       # Wait for one second
       Sys.sleep(1);
       # Square the argument 
      xx^2 } 
   a.global.variable <- Diagonal(3) 

and the serial version of the code is:

serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })

Parallelizing this code requires more setup with the "cluster" approach.

## Step 1: Create a cluster of child processes 
cl <- makeCluster( 4 )

## Step 2: Load the necessary R package(s)
## N.B. length(cl) is the number of child processes
##      in the cluster 
par.setup <- parLapply( cl, 1:length(cl),
    function(xx) {
        require(Matrix) 
    })

## Step 3: Distribute the necessary R objects 
clusterExport( cl, c('wait.then.square', 'a.global.variable') )

## Step 4: Do the computation
par.output <- parLapply(cl, 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })

## Step 5: Remember to stop the cluster!
stopCluster(cl)

## Check that the parallel and serial output are the same
all.equal(serial.output, par.output)
## [1] TRUE

This approach works on Windows, Linux, and Mac, but it requires a bit more bookkeeping.

The hack: implementing mclapply() with parLapply()

Even though Windows doesn't fork, I'd like to pretend that it does so that I can use the simpler syntax of mclapply(). My approach is to wrap the bookkeeping code for parLapply() into a single function: mclapply.hack().

This is likely a bad idea for general use. Creating and destroying clusters for every mclapply.hack() call defeats the advantages of having a persistent cluster to farm out work to. Copying every R object from the parent session to all of the cluster sessions takes up much more memory (and time!) then simply forking processes. Use this approach with caution!

The final code is as follows.

mclapply.hack <- function(...) {
    ## Create a cluster
    ## ... How many workers do you need?
    ## ... N.B. list(...)[[1]] returns the first 
    ##          argument passed to the function. In
    ##          this case it is the list to iterate over
    size.of.list <- length(list(...)[[1]])
    cl <- makeCluster( min(size.of.list, detectCores()) )

    ## Find out the names of the loaded packages 
    loaded.package.names <- c(
        ## Base packages
        sessionInfo()$basePkgs,
        ## Additional packages
        names( sessionInfo()$otherPkgs ))

    ## N.B. tryCatch() allows us to properly shut down the 
    ##      cluster if an error in our code halts execution
    ##      of the function. For details see: help(tryCatch)
    tryCatch( {

       ## Copy over all of the objects within scope to
       ## all clusters. 
       ## 
       ## The approach is as follows: Beginning with the 
       ## current environment, copy over all objects within
       ## the environment to all clusters, and then repeat
       ## the process with the parent environment. 
       ##
       this.env <- environment()
       while( identical( this.env, globalenv() ) == FALSE ) {
           clusterExport(cl,
                         ls(all.names=TRUE, env=this.env),
                         envir=this.env)
           this.env <- parent.env(environment())
       }
       ## repeat for the global environment
       clusterExport(cl,
                     ls(all.names=TRUE, env=globalenv()),
                     envir=globalenv())
       
       ## Load the libraries on all the clusters
       ## N.B. length(cl) returns the number of clusters
       parLapply( cl, 1:length(cl), function(xx){
           lapply(loaded.package.names, function(yy) {
               ## N.B. the character.only option of 
               ##      require() allows you to give the 
               ##      name of a package as a string. 
               require(yy , character.only=TRUE)})
       })
       
       ## Run the lapply in parallel 
       return( parLapply( cl, ...) )
    }, finally = {        
       ## Stop the cluster
       stopCluster(cl)
    })
}

We can test it as follows:

system.time( serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##   0.020   0.000   4.022 

system.time( par.output <- mclapply.hack( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##   0.024   0.012   3.683 

all.equal( serial.output, par.output )
## [1] TRUE

In this case, it works, but we don't save much time because of the bookkeeping required to setup the cluster for parLapply(). If we run a more intense function, say one that takes 10 seconds per iteration to run, then we can begin to see gains:

wait.longer.then.square <- function(xx){
    ## Wait for ten seconds
    Sys.sleep(10);
    ## Square the argument
    xx^2 } 

system.time( serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.longer.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##   0.020   0.000  40.059

system.time( par.output <- mclapply.hack( 1:4,
    function(xx) {
       return( wait.longer.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##  0.024   0.008  12.794 

all.equal( serial.output, par.output )
## [1] TRUE

The final answer: A multi-platform wrapper for mclappy()

My motivation for implementing mclapply() on Windows is so that code I wrote on Linux will "just work" on Windows.

I wrote a quick script to implement mclapply.hack() as mclapply() on Windows machines and leave mclapply() alone on Linux and Mac machines. The code is as follows:

##
## mclapply.hack.R
##
## Nathan VanHoudnos
## nathanvan AT northwestern FULL STOP edu
## July 14, 2014
##
## A script to implement a hackish version of 
## parallel:mclapply() on Windows machines.
## On Linux or Mac, the script has no effect
## beyond loading the parallel library. 

require(parallel)    

## Define the hack
mclapply.hack <- function(...) {
    ## Create a cluster
    size.of.list <- length(list(...)[[1]])
    cl <- makeCluster( min(size.of.list, detectCores()) )

    ## Find out the names of the loaded packages 
    loaded.package.names <- c(
        ## Base packages
        sessionInfo()$basePkgs,
        ## Additional packages
        names( sessionInfo()$otherPkgs ))

    tryCatch( {

       ## Copy over all of the objects within scope to
       ## all clusters. 
       this.env <- environment()
       while( identical( this.env, globalenv() ) == FALSE ) {
           clusterExport(cl,
                         ls(all.names=TRUE, env=this.env),
                         envir=this.env)
           this.env <- parent.env(environment())
       }
       clusterExport(cl,
                     ls(all.names=TRUE, env=globalenv()),
                     envir=globalenv())
       
       ## Load the libraries on all the clusters
       ## N.B. length(cl) returns the number of clusters
       parLapply( cl, 1:length(cl), function(xx){
           lapply(loaded.package.names, function(yy) {
               require(yy , character.only=TRUE)})
       })
       
       ## Run the lapply in parallel 
       return( parLapply( cl, ...) )
    }, finally = {        
       ## Stop the cluster
       stopCluster(cl)
    })
}

## Warn the user if they are using Windows
if( Sys.info()[['sysname']] == 'Windows' ){
    message(paste(
      "\n", 
      "   *** Microsoft Windows detected ***\n",
      "   \n",
      "   For technical reasons, the MS Windows version of mclapply()\n",
      "   is implemented as a serial function instead of a parallel\n",
      "   function.",
      "   \n\n",
      "   As a quick hack, we replace this serial version of mclapply()\n",
      "   with a wrapper to parLapply() for this R session. Please see\n\n",
      "     http://www.stat.cmu.edu/~nmv/2014/07/14/implementing-mclapply-on-windows \n\n",
      "   for details.\n\n"))
}

## If the OS is Windows, set mclapply to the
## the hackish version. Otherwise, leave the
## definition alone. 
mclapply <- switch( Sys.info()[['sysname']],
   Windows = {mclapply.hack}, 
   Linux   = {mclapply},
   Darwin  = {mclapply})

## end mclapply.hack.R

I posted the script at http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R. You can use it with

source('http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R')

as shown in the beginning of the post.

I would be grateful for any comments or suggestions for improving it. If there is sufficient interest, I can wrap it into a simple R package.

To leave a comment for the author, please follow the link and comment on their blog: Nathan VanHoudnos » rstats.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.