Site icon R-bloggers

%dofuture% – a Better foreach() Parallelization Operator than %dopar%

[This article was first published on JottR on R, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

doFuture 1.0.0 is on CRAN since March 2023. It introduces a new foreach operator %dofuture%, which makes it even easier to use foreach() to parallelize via the future ecosystem. This new operator is designed to be an alternative to the existing %dopar% operator for foreach() – an alternative that works in similar ways but better. If you already use foreach() together with futures, or plan on doing so, I recommend using %dofuture% instead of %dopar%. I’ll explain why I think so below.

Introduction

The traditional way to parallelize with foreach() is to use the %dopar% infix operator together with a registered foreach adaptor. The popular doParallel package provides %dopar% backends for parallelizing on the local machine. Here is an example that uses four local workers:

library(foreach)
workers <- parallel::makeCluster(4)
doParallel::registerDoParallel(cl = workers)

xs <- rnorm(1000)
y <- foreach(x = xs, .export = "slow_fcn") %dopar% {
  slow_fcn(x)
}

I highly suggest Futureverse for parallelization due to its advantages, such as relaying standard output, messages, warnings, and errors that were generated on the parallel workers in the main R process, support for near-live progress updates, and more descriptive backend error messages. Almost from the very beginning of the Futureverse, you have been able to use futures with foreach() and %dopar% via the doFuture package. For instance, we can rewrite the above example to use futures as:

library(foreach)
doFuture::registerDoFuture()
future::plan(multisession, workers = 4)

xs <- rnorm(1000)
y <- foreach(x = xs, .export = "slow_fcn") %dopar% {
  slow_fcn(x)
}

In this blog post, I am proposing to move to

library(foreach)
future::plan(multisession, workers = 4)

xs <- rnorm(1000)
y <- foreach(x = xs, .export = "slow_fcn") %dofuture% {
  slow_fcn(x)
}

instead. So, why is that better? It is because:

  1. %dofuture% removes the need to register a foreach backend, i.e. no more registerDoMC(), registerDoParallel(), registerDoFuture(), etc.

  2. %dofuture% is unaffected by any foreach backends that the end-user has registered.

  3. %dofuture% uses a consistent foreach() “options” argument, regardless of parallel backend used, and not different ones for different backends, e.g. .options.multicore, .options.snow, and .options.mpi.

  4. %dofuture% is guaranteed to always parallelizes via the Futureverse, using whatever plan() the end-user has specified. It also means that you, as a developer, have full control of the parallelization code.

  5. %dofuture% can generate proper parallel random number generation (RNG). There is no longer a need to use %dorng% of the doRNG package.

  6. %dofuture% automatically identifies global variables and packages that are needed by the parallel workers.

  7. %dofuture% relays errors generated in parallel as-is such that they can be handled using standard R methods, e.g. tryCatch().

  8. %dofuture% truly outputs standard output and messages, warnings, and other types of conditions generated in parallel as-is such that they can be handled using standard R methods, e.g. capture.output() and withCallingHandlers().

  9. %dofuture% supports near-live progress updates via the progressr package.

  10. %dofuture% gives more informative error messages, which helps troubleshooting, if a parallel worker crashes.

Below are the details.

Problems of %dopar% that %dofuture% addresses

Let me discuss a few of the unfortunate drawbacks that comes with %dopar%. Most of these stem from a slightly too lax design. Although convenient, the flexible design prevents us from having full control and writing code that can parallelize on any parallel backend.

Problem 1. %dopar% requires registering a foreach adaptor

If we write code that others will use, say, an R package, then we can never know what compute resources the user has, or will have in the future. Traditionally, this means that one user might want to use doParallel for parallelization, another doMC, and yet another, maybe, doRedis. Because of this, we must not have any calls to one of the many registerDoNnn() functions in our code. If we do, we lock users into a specific parallel backend. We could of course support a few different backends, but we are still locking users into a small set of parallel backends. If someone develops a new backend in the future, our code has to be updated before users can take advantage the new backends.

One can argue that doFuture::registerDoFuture() somewhat addresses this problem. On one hand, when used, it does lock the user into the future framework. On the other hand, the user has many parallel backends to choose from in the Futureverse, including backends that will be developed in the future. In this sense, the lock-in is less severe, especially since we do not have to update our code for new backends to be supported. Also, to avoid destructive side effects, registerDoFuture() allows you to change the foreach backend used inside your functions temporarily, e.g.

## Temporarily use futures
oldDoPar <- registerDoFuture()
on.exit(with(oldDoPar, foreach::setDoPar(fun=fun, data=data, info=info)), add = TRUE)

This avoids changing the foreach backend that the user might already have set elsewhere.

That said, I never wanted to say that people should use registerDoFuture() whenever using %dopar%, because I think that would be against the philosophy behind the foreach framework. The foreach ecosystem is designed to separate the foreach() + %dopar% code, describing what to parallelize, from the registerDoNnn() call, describing how and where to parallelize.

Using %dofuture%, instead of %dopar% with user-controlled foreach backend, avoids this dilemma. With %dofuture% the developer is in full control of the parallelization code.

Problem 2. Chunking and load-balancing differ among foreach backends

When using parallel map-reduce functions such as mclapply(), parLapply() of the parallel package, or foreach() with %dopar%, the tasks are partitioned into subsets and distributed to the parallel workers for processing. This partitioning is often referred to as “chunking”, because we chunk up the elements into smaller chunks, and then each chunk is processed by one parallel worker. There are different strategies to chunk up the elements. One approach is to use uniformly sized chunks and have each worker process one chunk. Another approach is to use chunks with a single element, and have each worker process one or more chunks.

The chunks may be pre-assigned (“prescheduled”) to the parallel workers up-front, which is referred to as static load balancing. An alternative is to assign chunks to workers on-the-fly as the workers become available, which is referred to as dynamic load balancing.

If the processing time differ a lot between elements, it is beneficial to use dynamic load balancing together with small chunk sizes.

However, if we dig into the documentation and source code of the different foreach backends, we will find that they use different chunking and load-balancing strategies. For example, assume we are running on a Linux machine, which supports forked processing. Then, if we use

library(foreach)
doParallel::registerDoParallel(ncores = 8)

y <- foreach(x = X, .export = "slow_fcn") %dopar% {
  slow_fcn(x)
}

the data will be processed by eight fork-based parallel workers using dynamic load balancing with single-element chunks. However, if we use PSOCK clusters:

library(foreach)
cl <- parallel::makeCluster(8)
doParallel::registerDoParallel(cl = cl)

y <- foreach(x = X, .export = "slow_fcn") %dopar% {
  slow_fcn(x)
}

the data will be processed by eight PSOCK-based parallel workers using static load balancing with uniformly sized chunks.

Which of these two chunking and load-balancing strategies are most efficient depends will depend on how much the processing time of slow_fcn(x) varies with different values of x. For example, and without going into details, if the processing times differ a lot, dynamic load balancing often makes better use of the parallel workers and results in a shorter overall processing time.

Regardless of which is faster, the problem with different foreach backends using different strategies is that, as a developer with little control over the registered foreach backend, you have equally poor control over the chunking and load-balancing strategies.

Using %dofuture%, avoids this problem. If you use %dofuture%, then dynamic load balancing will always be used for processing the data, regardless of which parallel future backend is in place, with the option to control the chunk size. As a side note, %dopar% with registerDoFuture() will also do this.

Problem 3. Different foreach backends use different foreach() options

In the previous section, I did not mention that for some foreach backends it is indeed possible to control whether static or dynamic load balancing should be used, and what the chunk sizes should be. This can be controlled by special .options.* arguments for foreach(). However, each foreach backend has their own .options.* argument, e.g. you might find that some use .options.multicore, others .options.snow, or something else. Because they are different, we cannot write code that works with any type of foreach backend.

To give two examples, when using doParallel and registerDoParallel(cores = 8), we can replace the default dynamic load balancing with static load balancing as:

library(foreach)
doParallel::registerDoParallel(ncores = 8)

y <- foreach(x = X, .export = "slow_fcn",
             .options.multicore = list(preschedule = TRUE)) %dopar% {
  slow_fcn(x)
}

This change will also switch from chunks with a single element to (eight) chunks with similar size.

If we instead would use registerDoParallel(cl), which gives us the vice versa situation, we can switch out the static load balancing with dynamic load balancing by using:

library(foreach)
cl <- parallel::makeCluster(8)
doParallel::registerDoParallel(cl = cl)

y <- foreach(x = X, .export = "slow_fcn",
             .options.snow = list(preschedule = FALSE)) %dopar% {
  slow_fcn(x)
}

This will also switch from uniformly sized chunks to single-elements chunks.

As we can see, the fact that we have to use different foreach() “options” arguments (here .options.multicore and .options.snow) for different foreach backends prevents us from writing code that works with any foreach backend.

Of course, we could specify “options” arguments for known foreach backends and hope we haven’t missed any and that no new ones are showing up later, e.g.

library(foreach)
doParallel::registerDoParallel(cores = 8)

y <- foreach(x = X,
             .export = "slow_fcn",
             .options.multicore = list(preschedule = TRUE),
             .options.snow      = list(preschedule = TRUE),
             .options.future    = list(preschedule = TRUE),
             .options.mpi       = list(chunkSize = 1)      ) %dopar% {
  slow_fcn(x)
}

Regardlessly, this still limits the end-user to a set of commonly used foreach backends, and our code can never be agile to foreach backends that are developed at a later time.

Using %dofuture% avoids these problems. It supports argument .options.future in a consistent way across all future backends, which means that your code will be the same regardless of parallel backend. By the core design of the Futureverse, any new future backends developed later one will automatically work with your foreach code if you use %dofuture%.

Problem 4. Global variables are not always identified by foreach()

When parallelizing code, the parallel workers must have access to all functions and variables required to evaluate the parallel code. As we have seen the above examples, you can use the .export argument to help foreach() to export the necessary objects to each of the parallel workers.

However, a developer who uses doMC::registerDoMC(), or equivalently doParallel::registerDoParallel(cores), might forget to specify the .export argument. This can happen because the mechanisms of forked processing makes all objects available to the parallel workers. If they test their code using only these foreach backends, they will not notice that .export is not declared. The same may happen if the developer assumes doFuture::registerDoFuture() is used. However, without specifying .export, the code will not work on other types of foreach backends, e.g. doParallel::registerDoParallel(cl) and doMPI::registerDoMPI(). If an R package forgets to specify the .export argument, and is not comprehensively tested, then it will be the end-user, for instance on MS Windows, that runs into the bug.

When using %dofuture%, global variables and required packages are by default automatically identified and exported to the parallel workers by the future framework. This is done the same way regardless of parallel backend.

Problem 5. Easy to forget parallel random number generation

The foreach package and %dopar% do not have built-in support for parallel random number generation (RNG). Statistical sound parallel RNG is critical for many statistical analyses. If not done, then the results can be biases and incorrect conclusions might be drawn. The doRNG package comes to rescue when using %dopar%. It provides the operator %dorng%, which will use %dopar% internally while automatically setting up parallel RNG. Whenever you use %dopar% and find yourself needing parallel RNG, I recommend to simply replace %dopar% with %dorng%. The doRNG package also provides registerDoRNG(), which I do not recommend, because as a developer you do not have full control whether that is registered or not.

Because foreach does not have built-in support for parallel RNG, it is easy to forget that it should be used. A developer who is aware of the importance of using proper parallel RNG will find out about doRNG and how to best use it, but a developer who is not aware of the problem, can easily miss it and publish an R package that produces potentially incorrect results.

However, when using the future framework will detect if we forget to use parallel RNG. When this happens, a warning will alert us to the problem and suggest how to fix it. This is the case if you use doFuture::registerDoFuture(), and it’s also the case when using %dofuture%. For example,

library(doFuture)
plan(multisession, workers = 3)

y <- foreach(ii = 1:4) %dofuture% {
  runif(ii)
}

produces

Warning messages:
1: UNRELIABLE VALUE: Iteration 1 of the foreach() %dofuture% { ... },
part of chunk #1 ('doFuture2-1'), unexpectedly generated random numbers
without declaring so. There is a risk that those random numbers are not
statistically sound and the overall results might be invalid. To fix
this, specify foreach() argument '.options.future = list(seed = TRUE)'.
This ensures that proper, parallel-safe random numbers are produced via
the L'Ecuyer-CMRG method. To disable this check, set option
'doFuture.rng.onMisuse' to "ignore". 

To fix this, we can specify foreach() argument .options.future = list(seed = TRUE) to declare that we need to draw random number in parallel, i.e.

library(doFuture)
plan(multisession, workers = 3)

y <- foreach(ii = 1:4, .options.future = list(seed = TRUE)) %dofuture% {
  runif(ii)
}

This makes sure that statistical sound random numbers are generated.

Migrating from %dopar% to %dofuture% is straightforward

If you already have code that uses %dopar% and want to start using %dofuture% instead, then it only takes are few changes, which are all straightforward and quick:

  1. Replace %dopar% with %dofuture%.

  2. Replace %dorng% with %dofuture% and set .options.future = list(seed = TRUE).

  3. Replace .export = <character vector of global variables> with .options.future = list(globals = <character vector of global variables>).

  4. Drop any other registerDoNnn() calls inside your function, if you use them.

  5. Update your documentation to mention that the parallel backend should be set using future::plan() and no longer via different registerDoNnn() calls.

In brief, if you use %dofuture% instead of `%dopar% your life as a developer will be easier and so will end-users lives too.

If you have questions or comments on doFuture and %dofuture%, or the Futureverse in general, please use the Futureverse Discussion forum.

Happy futuring!

Henrik

Links

To leave a comment for the author, please follow the link and comment on their blog: JottR on R.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Exit mobile version