Coarse Grain Parallelism with foreach and rxExec
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
by Joseph Rickert
I have written a several posts about the Parallel External Memory Algorithms (PEMAs) in Revolution Analytics’ RevoScaleR package, most recently about rxBTrees(), but I haven’t said much about rxExec(). rxExec() is not itself a PEMA, but it can be used to write parallel algorithms. Pre-built PEMAs such as rxBTrees(), rxLinMod(), etc are inherently parallel algorithms designed for distributed computing on various kinds of clusters: HPC Server, Platform LSF and Hadoop for example. rxExec()’s job, however, is to help ordinary, non-parallel functions run in parallel computing or distributed computing environments.
To get a handle on this, I think the best place to start is with R’s foreach() function which enables an R programmer to write “coarse grain”, parallel code. To be concrete, suppose we want to fit a logistic regression model to two different data sets. And to speed things up, we would like to do this in parallel. Since my laptop has two multi-threaded cores, this a straight-forward use case to prototype. The following code points to two of the multiple csv files that comprise the mortgageDefault data set available at Revolution Analytics’ data set download site.
#---------------------------------------------------------- # load needed libraries #---------------------------------------------------------- library(foreach) #---------------------------------------------------------- # Point to the Data #---------------------------------------------------------- dataDir <- "C:\DATA\Mortgage Data\mortDefault" fileName1 <- "mortDefault2000.csv" path1 <- file.path(dataDir,fileName1) fileName2 <- "mortDefault2001.csv" path2 <- file.path(dataDir,fileName2) #---------------------------------------------------------- # Look at the first data file system.time(data1 <- read.csv(path1)) #user system elapsed #2.52 0.02 2.55 dim(data1) head(data1,3) #creditScore houseAge yearsEmploy ccDebt year default #1 615 10 5 2818 2000 0 #2 780 34 5 3575 2000 0 #3 735 12 1 3184 2000 0
Note that it takes almost 3 seconds to read one of these files into a data frame.
The following function will read construct the name and path of a data set from parameters supplied to it, reads the data into a data frame and then uses R’s glm() function to fit a logistic regression model.
#----------------------------------------------------------- # Function to read data and fit a logistic regression #----------------------------------------------------------- glmEx <- function(directory,fileStem,fileNum,formula){ fileName <- paste(fileStem,fileNum,".csv",sep="") path <- file.path(directory,fileName) data <- read.csv(path) model <- glm(formula=formula,data=data,family=binomial(link="logit")) return(summary(model))} form <- formula(default ~ creditScore + houseAge + yearsEmploy + ccDebt)
Something like this might be reasonable if you had a whole bunch of data sets in a directory. To process the two data sets in parallel we set up and internal cluster with 2 workers, register the parallel backend and run foreach() with the %dopar% operator.
#---------------------------------------------------------- # Coarse grain parallelism with foreach #---------------------------------------------------------- cl <- makePSOCKcluster(2) # Create copies of R running in parallel and communicating over sockets. # My laptop has 2 multi threaded cores registerDoParallel(cl) #register parallel backend system.time(res <- foreach(num = c(2000,2001)) %dopar% glmEx(directory=dataDir,fileStem="mortDefault",fileNum=num,formula=form)) #user system elapsed #5.34 1.99 43.54 stopCluster(cl)
The basic idea is that my two-core PC processes the two data sets in parallel. The whole thing runs pretty quickly: two logit models are fit on a million rows each in about 44 seconds.
Now, the same process can be accomplished with rxExec() as follows:
#----------------------------------------------------------- # Coarse grain parallelism with rxExec #----------------------------------------------------------- rxOptions(numCoresToUse=2) rxSetComputeContext("localpar") # use the local parallel compute context rxGetComputeContext() argList2 <- list(list(fileNum=2000),list(fileNum=2001)) system.time(res <- rxExec(glmEx,directory=dataDir,fileStem="mortDefault",formula=form,elemArgs=argList2)) #user system elapsed #4.85 2.01 45.54
First notice that rxExec() took about the same amount of time to run. This is not surprising since, under the hood, rxExec() looks a lot like foreach() (while providing additional functionality). Indeed, the same Revolution Analytics team worked on both functions.
You can also see that rxExec() looks a bit like an apply() family function in that it takes a function, in this case my sample function glmEx(), as one of its arguments. The elemArgs parameter takes a list of arguments that will be different for constructing the two file names, while the other arguments separated by commas in the call statement are parameters that are the same for both. With this tidy syntax we could direct the function to fit models that are located in very different locations and also set different parameters for each glm() call.
The really big difference between foreach() and rxExec(), however, is the line
rxSetComputeContext("localpar")
which sets the compute context. This is the mechanism that links rxExec() and pre-built PEMA’s to RevoScaleR’s underlying distributed computing architecture. Changing the the compute context allows you to run the R function in the rxExec() call on a cluster. For example, in the simplest case where you can log into an edge node on a Hadoop cluster, the following code would enable rxExec() to run the glmEx() function on each node of the cluster.
myHadoopContext <- RxHadoopMR()
rxSetComputeContext(myHadoopContext)
In a more complicated scenario, for example where you are remotely connecting to the cluster, it will be necessary to include your credentials and some other parameters in the statement that specifies the compute context.
Finally, we can ratchet things up to a higher level of performance by using a PEMA in the rxExec() call. This would make sense in a scenario where you want to fit a different model one each node of a cluster while making sure that you are getting the maximum amount of parallel computation from all of the cores on each node. The following new version of the custom glm function uses the RevoScaleR PEMA rxLogit() to fit the logistic regressions:
---------------------------------------------------------- # Finer parallelism with rxLogit #---------------------------------------------------------- glmExRx <- function(directory,fileStem,fileNum,formula){ fileName <- paste(fileStem,fileNum,".csv",sep="") path <- file.path(directory,fileName) data <- read.csv(path) model <- rxLogit(formula=formula,data=data) return(summary(model))} argList2 <- list(list(fileNum=2000),list(fileNum=2001)) system.time(res <- rxExec(glmExRx,directory=dataDir,fileStem="mortDefault",formula=form,elemArgs=argList2)) #user system elapsed #0.01 0.00 8.33
Here, still running just locally on my laptop, we see quite an improvement in performance. The computation runs in about 8.3 seconds. (Remember that over two seconds of this elapsed time is devoted to reading the data.). Some of this performance improvement comes from additional, “finer grain” parallelism of the rxLogit() function. Most of the speedup, however, is likely due to careful handling of the underlying matrix computations.
In summary, rxExec() can be thought of as an extension of foreach() that is capable of leveraging all kinds of R functions in distributed computing environments.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.