Site icon R-bloggers

parallelly 1.29.0: New Skills and Less Communication Latency on Linux

[This article was first published on JottR on R, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

parallelly 1.29.0 is on CRAN. The parallelly package enhances the parallel package – our built-in R package for parallel processing – by improving on existing features and by adding new ones. Somewhat simplified, parallelly provides the things that you would otherwise expect to find in the parallel package. The future package rely on the parallelly package internally for local and remote parallelization.

Since my previous post on parallelly five months ago, the parallelly package had some bugs fixed, and it gained a few new features;

Below is a detailed description of these new features.

New function isForkedChild()

If you run R on Unix and macOS, you can parallelize code using so called forked parallel processing. It is a very convenient way of parallelizing code, especially since forking is implemented at the core of the operating system and there is very little extra you have to do at the R level to get it to work. Compared with other parallelization solutions, forked processing has often less overhead, resulting in shorter turnaround times. To date, the most famous method for parallelizing using forks is mclapply() of the parallel package. For example,

library(parallel)
y <- mclapply(X, some_slow_fcn, mc.cores = 4)

works just like lapply(X, some_slow_fcn) but will perform the same tasks in parallel using four (4) CPU cores. MS Windows does not support forked processing; any attempt to use mclapply() there will cause it to silently fall back to a sequential lapply() call.

In the future ecosystem, you get forked parallelization with the multicore backend, e.g.

library(future.apply)
plan(multicore, workers = 4)
y <- future_lapply(X, some_slow_fcn)

Unfortunately, we cannot parallelize all types of code using forks. If done, you might get an error, but in the worst case you crash (segmentation fault) your R process. For example, some graphical user interfaces (GUIs) do not play well with forked processing, e.g. the RStudio Console, but also other GUIs. Multi-threaded parallelization has also been reported to cause problems when run within forked parallelization. We sometime talk about non-fork-safe code, in contrast to fork-safe code, to refer to code that risks crashing the software if run in forked processes.

Here is what R-core developer Simon Urbanek and author of mclapply() wrote in the R-devel thread ‘mclapply returns NULLs on MacOS when running GAM’ on 2020-04-28:

Do NOT use mcparallel() in packages except as a non-default option that user can set for the reasons … explained [above]. Multicore is intended for HPC applications that need to use many cores for computing-heavy jobs, but it does not play well with RStudio and more importantly you don’t know the resource available so only the user can tell you when it is safe to use. Multi-core machines are often shared so using all detected cores is a very bad idea. The user should be able to explicitly enable it, but it should not be enabled by default.

It is not always obvious to know whether a certain function call in R is fork safe, especially not if we haven’t written all the code ourselves. Because of this, it is more of a trial and error so see if works. However, when we know that a certain function call is not fork safe, it is useful to protect against using it in forked parallelization. In parallelly (>= 1.28.0), we can use function isForkedChild() test whether or not R runs in a forked child process. For example, the author of some_slow_fcn() above, could protect against mistakes by:

some_slow_fcn <- function(x) {
  if (parallelly::isForkedChild()) {
    stop("This function must not be used in *forked* parallel processing")
  }
  
  y <- non_fork_safe_code(x)
  ...
}

or, if they have an alternative, less preferred, non-fork-safe implementation, they could run that conditionally on R being executed in a forked child process:

some_slow_fcn <- function(x) {
  if (parallelly::isForkedChild()) {
    y <- fork_safe_code(x)
  } else {
    y <- alternative_code(x)
  }
  ...
}

New function isNodeAlive()

The new function isNodeAlive() checks whether one or more nodes are alive. For instance,

library(parallelly)
cl <- makeClusterPSOCK(3)
isNodeAlive(cl)
#> [1] TRUE TRUE TRUE

Imagine the second parallel worker crashes, which we can emulate with

clusterEvalQ(cl[2], tools::pskill(Sys.getpid()))
#> Error in unserialize(node$con) : error reading from connection

then we get:

isNodeAlive(cl)
#> [1]  TRUE FALSE  TRUE

The isNodeAlive() function works by querying the operating system to see if those processes are still running, based their process IDs (PIDs) recorded by makeClusterPSOCK() when launched. If the workers’ PIDs are unknown, then NA is returned instead. For instance, contrary to parallelly::makeClusterPSOCK(), parallel::makeCluster() does not record the PIDs and we get missing values as the result;

library(parallelly)
cl <- parallel::makeCluster(3)
isNodeAlive(cl)
#> [1] NA NA NA

Similarly, if one of the parallel workers runs on a remote machine, we cannot easily query the remote machine for the PID existing or not. In such cases, NA is returned. Maybe we will be able to query also remote machines in a future version of parallelly, but for now, it is not possible.

availableCores() respects Bioconductor settings

Function availableCores() queries the hardware and the system environment to find out how many CPU cores it may run on. It does this by checking system settings, environment variables, and R options that may be set by the end-user, the system administrator, the parent R process, the operating system, a job scheduler, and so on. When you use availableCores(), you don’t have to worry about using more CPU resources than you were assigned, which helps guarantee that it runs nicely together with everything else on the same machine.

In parallelly (>= 1.29.0), availableCores() is now also agile to Bioconductor-specific settings. For example, BiocParallel 1.27.2 introduced environment variable BIOCPARALLEL_WORKER_NUMBER, which sets the default number of parallel workers when using BiocParallel for parallelization. Similarly, on Bioconductor check servers, they set environment variable BBS_HOME, which BiocParallel uses to limit the number of cores to four (4). Now availableCores() reflects also those settings, which, in turn, means that future settings like plan(multisession) will also automatically respect the Bioconductor settings.

Function availableWorkers(), which relies on availableCores() as a fallback, is therefore also agile to these Bioconductor environment variables.

makeClusterPSOCK(…, rscript = “*“)

Argument rscript of makeClusterPSOCK() can be used to control exactly which Rscript executable is used to launch the parallel workers, and also how that executable is launched. The default settings is often sufficient, but if you want to launch a worker, say, within a Linux container you can do so by adjusting rscript. The help page for makeClusterPSOCK() has several examples of this. It may also be used for other setups. For example, to launch two parallel workers on a remote Linux machine, such that their CPU priority is less than other processing running on that machine, we can use (*):

workers <- rep("remote.example.org", times = 2)
cl <- makeClusterPSOCK(workers, rscript = c("nice", "Rscript"))

This causes the two R workers to be launched using nice Rscript .... The Unix command nice is what makes Rscript to run with a lower CPU priority. By running at a lower priority, we decrease the risk for our parallel tasks to have a negative impact on other software running on that machine, e.g. someone might use that machine for interactive work without us knowing. We can do the same thing on our local machine via:

cl <- makeClusterPSOCK(2L,
        rscript = c("nice", file.path(R.home("bin"), "Rscript")))

Here we specified the absolute path to Rscript to make sure we run the same version of R as the main R session, and not another Rscript that may be on the system PATH.

Starting with parallelly 1.29.0, we can replace the Rscript specification in the above two examples with "*", as in:

workers <- rep("remote-machine.example.org, times = 2L)
cl <- makeClusterPSOCK(workers, rscript = c("nice", "*"))

and

cl <- makeClusterPSOCK(2L, rscript = c("nice", "*"))

When used, makeClusterPSOCK() will expand "*" to the proper Rscript specification depending on running remotely or not. To further emphasize the convenience of this, consider:

workers <- c("localhost", "remote-machine.example.org")
cl <- makeClusterPSOCK(workers, rscript = c("nice", "*"))

which launches two parallel workers – one running on local machine and one running on the remote machine.

Note that, when using future, we can pass rscript to plan(multisession) and plan(cluster) to achieve the same thing, as in

plan(cluster, workers = workers, rscript = c("nice", "*"))

and

plan(multisession, workers = 2L, rscript = c("nice", "*"))

(*) Here we use nice as an example, because it is a simple way to illustrate how rscript can be used. As a matter of fact, there is already an argument renice, which we can use to achieve the same without using the rscript argument.

makeClusterPSOCK(…, rscript_envs = c(UNSET_ME = NA_character_))

Argument rscript_envs of makeClusterPSOCK() can be used to set environment variables on cluster nodes, or copy existing ones from the main R session to the cluster nodes. For example,

cl <- makeClusterPSOCK(2, rscript_envs = c(PI = "3.14", "MY_EMAIL"))

will, during startup, set environment variable PI on each of the two cluster nodes to have value 3.14. It will also set MY_EMAIL on them to the value of Sys.getenv("MY_EMAIL") in the current R session.

Starting with parallelly 1.29.0, we can now also unset environment variables, in case they are set on the cluster nodes. Any named element with a missing value causes the corresponding environment variable to be unset, e.g.

cl <- makeClusterPSOCK(2, rscript_envs = c(_R_CHECK_LENGTH_1_CONDITION_ = NA_character_))

This results in passing -e 'Sys.unsetenv("_R_CHECK_LENGTH_1_CONDITION_")' to Rscript when launching each worker.

makeClusterPSOCK() sets up clusters with less communication latency on Unix

It turns out that, in R on Unix, there is a significant latency in the communication between the parallel workers and the main R session (**). Starting in R (>= 4.1.0), it is possible to decrease this latency by setting a dedicated R option on each of the workers, e.g.

rscript_args <- c("-e", shQuote("options(socketOptions = 'no-delay')")
cl <- parallel::makeCluster(workers, rscript_args = rscript_args))

This is quite verbose, so I’ve made this the new default in parallelly (>= 1.29.0), i.e. you can keep using:

cl <- parallelly::makeClusterPSOCK(workers)

to benefit from the above. See help for makeClusterPSOCK() for options on how to change this new default.

Here is an example that illustrates the difference in latency with and without the new settings;

cl_parallel   <- parallel::makeCluster(1)
cl_parallelly <- parallelly::makeClusterPSOCK(1)

res <- bench::mark(iterations = 1000L,
    parallel = parallel::clusterEvalQ(cl_parallel, iris),
  parallelly = parallel::clusterEvalQ(cl_parallelly, iris)
)

res[, c(1:4,9)]
#> # A tibble: 2 × 5
#>   expression      min   median `itr/sec` total_time
#>   <bch:expr> <bch:tm> <bch:tm>     <dbl>   <bch:tm>
#> 1 parallel      277µs     44ms      22.5      44.4s
#> 2 parallelly    380µs    582µs    1670.     598.3ms

From this, we see that the total latency overhead for 1,000 parallel tasks went from 44 seconds down to 0.60 seconds, which is ~75 times less on average. Does this mean your parallel code will run faster? No, it is just the communication latency that has decreased. But, why waste time on waiting on your results when you don’t have to? This is why I changed the defaults in parallelly. It will also bring the experience on Unix on par with MS Windows and macOS.

Note that the relatively high latency affects only Unix. MS Windows and macOS do not suffer from this extra latency. For example, on MS Windows 10 that runs in a virtual machine on the same Linux computer as above, I get:

#> # A tibble: 2 × 5
#>   expression      min   median `itr/sec` total_time
#>   <bch:expr> <bch:tm> <bch:tm>     <dbl>   <bch:tm>
#> 1 parallel      191us    314us     2993.      333ms
#> 2 parallelly    164us    311us     3227.      310ms

If you’re using future with plan(multisession) or plan(cluster), you’re already benefitting from the performance gain, because those rely on parallelly::makeClusterPSOCK() internally.

(**) Technical details: Options socketOptions sets the default value of argument options of base::socketConnection(). The default is NULL, but if we set it to "no-delay", the created TCP socket connections are configured to use the TCP_NODELAY flag. When using TCP_NODELAY, a TCP connection will no longer use the so called Nagle’s algorithm, which otherwise is used to reduces the number of TCP packets needed to be sent over the network by making sure TCP fills up each packet before sending it off. When using the new "no-delay", this buffering is disabled and packets are sent as soon as data come in. Credits for this improvement should go to Jeff Keller, who identified and reported the problem to R-devel, to Iñaki Úcar who pitched in, and to Simon Urbanek, who implemented support for socketConnection(..., options = "no-delay") for R 4.1.0.

Bug fixes

Finally, the most important bug fixes since parallelly 1.26.0 are:

For all other bug fixes and updates, please see NEWS.

Over and out!

Links

To leave a comment for the author, please follow the link and comment on their blog: JottR on R.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.