Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
In this post, I will talk about parallelism in R. This post will likely be biased towards the solutions I use. For example, I never use mcapply
nor clusterApply
. I prefer to always use foreach
. In this post, we will focus on how to parallelize R code on your computer.
I will use mainly silly examples, just to show one point at a time.
Basics of foreach
You can install the foreach package with install.packages("foreach")
.
library(foreach) foreach(i = 1:3) %do% { sqrt(i) } ## [[1]] ## [1] 1 ## ## [[2]] ## [1] 1.414214 ## ## [[3]] ## [1] 1.732051
So, in the example above, you iterate on i
and apply the expression sqrt(i)
. foreach
returns a list by default. A common mistake is to think that foreach
is like a for-loop. Actually, foreach
is more like lapply
.
lapply(1:3, function(i) { sqrt(i) }) ## [[1]] ## [1] 1 ## ## [[2]] ## [1] 1.414214 ## ## [[3]] ## [1] 1.732051
Parameter .combine
is very useful.
foreach(i = 1:3, .combine = 'c') %do% { sqrt(i) } ## [1] 1.000000 1.414214 1.732051
With lapply
, we would do
res <- lapply(1:3, function(i) { sqrt(i) }) do.call('c', res) ## [1] 1.000000 1.414214 1.732051
Parallelize with foreach
You need to do at least two things:
- replace
%do%
by%dopar%
. Basically, always use%dopar%
because you can useregisterDoSEQ()
is you really want to run theforeach
sequentially. - register a parallel backend using one of the packages that begin with do (such as
doParallel
,doMC
,doMPI
and more). I will list only the two main parallel backends because there are too many of them.
Using clusters
# Example registering clusters cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) foreach(i = 1:3, .combine = 'c') %dopar% { sqrt(i) } ## [1] 1.000000 1.414214 1.732051 parallel::stopCluster(cl)
In this situation, all the data and packages used must be exported (copied) to the clusters, which can add some overhead. Yet, at least, you know what you do.
Using forking
cl <- parallel::makeForkCluster(2) doParallel::registerDoParallel(cl) foreach(i = 1:3, .combine = 'c') %dopar% { sqrt(i) } ## [1] 1.000000 1.414214 1.732051 parallel::stopCluster(cl)
Forking just copy the R session at its current state. This is very fast because it copies objects only it they are modified. Moreover, you don’t need to export variables nor packages because they are already in the session. However, this can’t be used on Windows. This is why I use the clusters option in my packages. Also, I don’t trust forking to always work.
Common problems/mistakes/questions
Exporting variables and packages
object xxx not found or function xxx not found.
# Some data and function library(dplyr) dfs <- rep(list(iris), 3) count(dfs[[1]], Species) ## # A tibble: 3 x 2 ## Species n ## <fctr> <int> ## 1 setosa 50 ## 2 versicolor 50 ## 3 virginica 50 # Sequential processing to apply to # all the data frames of the list 'dfs' registerDoSEQ() myFun <- function() { foreach(i = seq_along(dfs)) %dopar% { df <- dfs[[i]] count(df, Species) } } str(myFun()) ## List of 3 ## $ :Classes 'tbl_df', 'tbl' and 'data.frame': 3 obs. of 2 variables: ## ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3 ## ..$ n : int [1:3] 50 50 50 ## $ :Classes 'tbl_df', 'tbl' and 'data.frame': 3 obs. of 2 variables: ## ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3 ## ..$ n : int [1:3] 50 50 50 ## $ :Classes 'tbl_df', 'tbl' and 'data.frame': 3 obs. of 2 variables: ## ..$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3 ## ..$ n : int [1:3] 50 50 50 # Try in parallel cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) tryCatch(myFun(), error = function(e) print(e)) ## <simpleError in { df <- dfs[[i]] count(df, Species)}: task 1 failed - "objet 'dfs' introuvable"> parallel::stopCluster(cl)
Hey, why doesn’t this work anymore? foreach
will export all the needed variables that are present in its environment (here, the environment of myFun
) and dfs
is not in this environment. Some will tell you to use option .export
of foreach
but I don’t think it’s good practice. You just have to pass dfs
to myFun
.
myFun2 <- function(dfs) { foreach(i = seq_along(dfs)) %dopar% { df <- dfs[[i]] count(df, Species) } } # Try in parallel cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) tryCatch(myFun2(dfs), error = function(e) print(e)) ## <simpleError in { df <- dfs[[i]] count(df, Species)}: task 1 failed - "impossible de trouver la fonction "count""> parallel::stopCluster(cl)
Hey, this still doesn’t work. You also need to load packages. You could use option .packages
of foreach
but you could simply add dplyr::
before count
. It will be clearer (like one does in packages).
myFun3 <- function(dfs) { foreach(i = seq_along(dfs)) %dopar% { df <- dfs[[i]] dplyr::count(df, Species) } } # Try in parallel cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) tryCatch(myFun3(dfs), error = function(e) print(e)) ## [[1]] ## # A tibble: 3 x 2 ## Species n ## <fctr> <int> ## 1 setosa 50 ## 2 versicolor 50 ## 3 virginica 50 ## ## [[2]] ## # A tibble: 3 x 2 ## Species n ## <fctr> <int> ## 1 setosa 50 ## 2 versicolor 50 ## 3 virginica 50 ## ## [[3]] ## # A tibble: 3 x 2 ## Species n ## <fctr> <int> ## 1 setosa 50 ## 2 versicolor 50 ## 3 virginica 50 parallel::stopCluster(cl)
Iterate over lots of elements.
cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) system.time( foreach(i = seq_len(2e4), .combine = 'c') %dopar% { sqrt(i) } ) ## user system elapsed ## 6.144 0.329 6.646 parallel::stopCluster(cl)
Iterating over multiple elements in R is bad for performance. Moreover, foreach
is only combining results 100 by 100, which also slows computations.
If there are too many elements to loop over, the best is to split the computation in ncores blocks and to perform some optimized sequential work on each block. In package bigstatsr, I use the following function to split indices in nb
groups because I often need to iterate over hundreds of thousands of elements (columns).
bigstatsr:::CutBySize ## function (m, block.size, nb = ceiling(m/block.size)) ## { ## if (nb > m) ## nb <- m ## int <- m/nb ## upper <- round(1:nb * int) ## lower <- c(1, upper[-nb] + 1) ## size <- c(upper[1], diff(upper)) ## cbind(lower, upper, size) ## } ## <bytecode: 0xc66c820> ## <environment: namespace:bigstatsr> bigstatsr:::CutBySize(20, nb = 3) ## lower upper size ## [1,] 1 7 7 ## [2,] 8 13 6 ## [3,] 14 20 7
Filling something in parallel
Using foreach loop in r returning NA
mat <- matrix(0, 5, 8) registerDoSEQ() # Nested foreach loop tmp <- foreach(j = 1:8) %:% foreach(i = 1:5) %dopar% { mat[i, j] <- i + j } mat ## [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] ## [1,] 2 3 4 5 6 7 8 9 ## [2,] 3 4 5 6 7 8 9 10 ## [3,] 4 5 6 7 8 9 10 11 ## [4,] 5 6 7 8 9 10 11 12 ## [5,] 6 7 8 9 10 11 12 13 # Try in parallel mat2 <- matrix(0, 5, 8) cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) tmp2 <- foreach(j = 1:8) %:% foreach(i = 1:5) %dopar% { mat2[i, j] <- i + j } parallel::stopCluster(cl) mat2 ## [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] ## [1,] 0 0 0 0 0 0 0 0 ## [2,] 0 0 0 0 0 0 0 0 ## [3,] 0 0 0 0 0 0 0 0 ## [4,] 0 0 0 0 0 0 0 0 ## [5,] 0 0 0 0 0 0 0 0
There are two problems here:
mat
is filled in the sequential version but won’t be in the parallel version. Why? This is because when using parallelism,mat
is copied so that each core modifies a copy of the matrix, not the original one.foreach
returns something (here a two-level list).
To overcome this problem, you could use shared-memory. For example, with my package bigstatsr.
library(bigstatsr) mat3 <- FBM(5, 8) cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) tmp3 <- foreach(j = 1:8, .combine = 'c') %:% foreach(i = 1:5, .combine = 'c') %dopar% { mat3[i, j] <- i + j NULL } parallel::stopCluster(cl) mat3[] ## [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] ## [1,] 2 3 4 5 6 7 8 9 ## [2,] 3 4 5 6 7 8 9 10 ## [3,] 4 5 6 7 8 9 10 11 ## [4,] 5 6 7 8 9 10 11 12 ## [5,] 6 7 8 9 10 11 12 13 tmp3 ## NULL
The original matrix is now modified. Note that I return NULL
to save memory.
Parallelize over a large matrix
mat <- matrix(0, 1e4, 1e4); mat[] <- rnorm(length(mat)) cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) system.time( tmp <- foreach(k = 1:2, .combine = 'c') %dopar% { Sys.sleep(1) mat[1, 1] } ) ## user system elapsed ## 1.834 0.262 3.806 parallel::stopCluster(cl)
Copying mat
to both clusters takes time (and memory!).
mat2 <- FBM(1e4, 1e4); mat2[] <- rnorm(length(mat2)) cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) system.time( tmp <- foreach(k = 1:2, .combine = 'c') %dopar% { Sys.sleep(1) mat2[1, 1] } ) ## user system elapsed ## 0.025 0.005 2.410 parallel::stopCluster(cl)
This is faster because it’s using a matrix that is stored on disk (so shared between processes) so that it doesn’t need to be copied.
Advanced parallelism: synchronization
For example, you may need to write to the same data (maybe increment it). In this case, it is important to use some locks so that only one session writes to the data at the same time. For that, you could use package flock, which is really easy to use.
mat <- FBM(1, 1, init = 0) mat[] ## [1] 0 cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) foreach(k = 1:10, .combine = 'c') %dopar% { mat[1, 1] <- mat[1, 1] + k NULL } ## NULL parallel::stopCluster(cl) mat[] ## [1] 33 sum(1:10) ## [1] 55 lock <- tempfile() mat2 <- FBM(1, 1, init = 0) mat2[] ## [1] 0 cl <- parallel::makeCluster(2) doParallel::registerDoParallel(cl) foreach(k = 1:10, .combine = 'c') %dopar% { locked <- flock::lock(lock) mat2[1, 1] <- mat2[1, 1] + k flock::unlock(locked) NULL } ## NULL parallel::stopCluster(cl) mat2[] ## [1] 55
So each process uses some lock to perform its incrementation so that the data can’t be changed by some other process in the meantime.
You may also need to use some message passing or some barriers. For that, you could learn to use MPI. For some basic use, I “reimplemented” this using only shared-memory matrices (FBMs). You can see this function is you’re interested.
Miscellenaous
Recall that you won’t gain much from parallelism. You’re likely to gain much more performance by simply optimizing your sequential code. Don’t reproduce the silly examples here as real code, they are quite bad.
How to print during parallel execution? Use option
outfile
inmakeCluster
(for example, usingoutfile = ""
will redirect to the console).Don’t try to parallelize huge matrix operations with loops. There are already (parallel) optimized linear algebra libraries that exist and which will be much faster. For example, you could use Microsoft R Open.
Some will tell you to use
parallel::detectCores() - 1
cores.
Conclusion
Hope this can help some.
Don’t hesitate to comment if you want to add/modify something to this post.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.