4  Parallel workflow

Modified

August 21, 2025

Although R was not built for parallel computing, multiple ways of parallelizing your R code exist. One of these is the parallel package. This R package, shipped with base R, provides various functions to parallelize R code using embarrassingly parallel computing, i.e., a divide-and-conquer-type strategy. The basic idea is to start multiple R sessions (usually called child processes), connect the main session with those, and send them instructions. This section goes over a common workflow to work with R’s parallel.

(Usually) We do the following:

  1. Create a PSOCK/FORK (or other) cluster using makePSOCKCluster/makeForkCluster (or makeCluster). How many child processes will depend on how many threads your computer has. A rule of thumb is to use parallel::detectCores() - 1 cores (so you leave one free for the rest of your computer).

  2. Copy/prepare each R session (if you are using a PSOCK cluster):

    1. Copy objects with clusterExport. This would be all the objects that you need in the child sessions.

    2. Pass expressions with clusterEvalQ. This would include loading R packages and other code into the other sessions.

    3. Set a seed (if you are doing something that involves randomness)

  3. Do your call: parApply, parLapply, etc.

  4. Stop the cluster with clusterStop

As we mention later, step 2 will depend on the type of cluster you are using. If you are using a Socket connection (PSOCK cluster), then the spawned R sessions will be completely, fresh (no data or R packages pre-loaded); whereas using a Fork connection (FORK cluster) will copy the current R session, including all objects and loaded packages.

4.1 Types of clusters: PSOCK

  • Can be created with makePSOCKCluster

  • Creates brand new R Sessions (so nothing is inherited from the master), e.g.

    # This creates a cluster with 4 R sessions
    cl <- makePSOCKCluster(4)
  • Child sessions are connected to the master session via Socket connections

  • Can be created outside the current computer, i.e., across multiple computers!

4.2 Types of clusters: Fork

  • Fork Cluster makeForkCluster:

  • Uses OS Forking,

  • Copies the current R session locally (so everything is inherited from the master up to that point).

  • Data is only duplicated if altered (need to double check when this happens!)

  • Not available on Windows.

Other types are available via the function makeCluster from the snow R package (Simple Network of Workstations). These include MPI (Message Passing Interface) clusters and Slurm (Socket) clusters.

4.3 A template program

The following code chunk shows a template for using the parallel package in R. You can copy this and comment the bits that you don’t need:

library(parallel)

# 1. CREATING A CLUSTER ----------------
nnodes <- 4L # Could be less or more!
cl <- makePSOCKcluster(nnodes)

# 2. PREPARING THE CLUSTER -------------

# Mostly if using PSOCK
clusterEvalQ(cl, {
  library(...) # Loading the necesary packages
  source(...) # Source additional scripts
})

# Always if you are doing random numbers
clusterSetRNGStream(cl, 123)

# 3. DO YOUR CALL ----------------------
ans <- parLapply(
  cl,
  ... long list to iterate ...,
  function(x) {
    ...
  },
  ... further arguments ...
  )

# 4. STOP THE CLUSTER
stopCluster(cl)

Generally, the ... long list to iterate ... will be a vector or another list that contains either data (e.g., individual datasets), a sequence of numbers (e.g., from 1 to 1000), a list of file paths (if you were processing files individually), or directly a short sequence with numbers from 1 to the number of nodes (least common application).

When calling parLapply or parSapply (the parallel versions of lapply and sapply respectively), the function call will automatically split the iterations across nodes using the splitIndices function. Here is an example of what happens under the hood:

# Distributing 9 iterations across two cores
(n_iterations <- parallel::splitIndices(nx = 9, ncl = 2))
[[1]]
[1] 1 2 3 4

[[2]]
[1] 5 6 7 8 9

Which means that the first R session will get 4 jobs, wereas the second R session will get 5 jobs. This way, each spawned R session (child session) gets to do a similiar number of iterations.

4.4 Example: Running a linear regression across multiple columns

In genomics, it is common to analyze genomic data at the gene level comparing expression levels against some phenotype/disease. A simple analysis consists of running a linear regression across multiple columns (genes) of a data frame. The following code-block generates some artificial data we can use for this example:

set.seed(331)
n_genes <- 10000
n_obs <- 1000

# A random matrix of omics
X_genes <- rnorm(n_obs * n_genes) |>
  matrix(nrow = n_obs)

# A random phenotype (completely unrelated for this example)
Y <- rnorm(n_obs) |> cbind()

We will wrap the analysis into a function so we can do benchmarking. We will use the lapply function to iterate over the columns of X_genes

ols_serial <- function(X, Y) {
  lapply(
    X = seq_len(n_genes),
    FUN = \(i) {lm.fit(X[, i, drop = FALSE], Y) |> coef()}
  ) |> do.call(what = rbind)
}

# Calling the function and looking at the first few rows
ols_serial(X_genes, Y) |> head()
               x1
[1,]  0.029403088
[2,]  0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,]  0.066971469
Tip

Like we did in the efficient programming section, instead of using lm() or glm(), we can use lm.fit() for better performance. The lm.fit() function does less than the lm() function by skipping computing residuals and other overhead, making it faster for large datasets.

Using parallel computing (and following the template we presented earlier), this could be done in the following way with the parallel package:

library(parallel)

ols_parallel <- function(X, Y, ncores) {
  # 1. CREATING A CLUSTER ----------------
  cl <- makePSOCKcluster(ncores)
  
  # This will be called when exiting the function
  on.exit(stopCluster(cl)) 

  # 2. PREPARING THE CLUSTER -------------
  # We copy the data over
  clusterExport(cl, c("X", "Y"), envir = environment())

  # 3. DO YOUR CALL ----------------------
  parLapply(
    cl,
    seq_len(n_genes),
    function(i) {
      lm.fit(X[, i, drop = FALSE], Y) |> coef()
    }
  ) |> do.call(what = rbind)
}

# Checking it works
ols_parallel(X_genes, Y, ncores = 4L) |> head()
               x1
[1,]  0.029403088
[2,]  0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,]  0.066971469
Tip

Just like the return(), on.exit() can only be used within a function call. We could have used stopCluster(cl) at the end as we do in our template example, but the benefit of using on.exit() is that it will be called automatically when the function exits, even if an error occurs. This helps to ensure that the cluster is always stopped properly.

Now that we have the function implemented, we can go ahead and (1) compare results and (2) measure performance.

library(microbenchmark)

microbenchmark(
  serial = ols_serial(X_genes, Y),
  parallel = ols_parallel(X_genes, Y, ncores = 4L),
  times = 10L,
  check = "identical"
)
Unit: milliseconds
     expr       min       lq      mean    median        uq       max neval
   serial  600.3555  612.506  699.1705  700.4057  774.8037  804.8882    10
 parallel 1870.8310 1875.149 1884.7327 1887.2280 1890.6226 1896.3960    10

From the comparison, we can see that the parallel version is significantly slower than the serial version. Two things to note here are (a) the task we are running is already fast (about 0.3 seconds on average for the serial run) and (b) there is an overhead cost associated with creating, preparing, and stopping the cluster. As we mentioned earlier, parallel optimizations only make sense if your code is already talking a significant amount of time, making the overhead cost associated with the setup relatively small. The following implementation of the function should make it significantly faster:

ols_parallel2 <- function(cl) {
  # 1. CREATING A CLUSTER ----------------
  # 2. PREPARING THE CLUSTER -------------
  # No need anymore as we are handling the core outside

  # 3. DO YOUR CALL ----------------------
  parLapply(
    cl,
    seq_len(n_genes),
    function(i) {
      lm.fit(X_genes[, i, drop = FALSE], Y) |> coef()
    }
  ) |> do.call(what = rbind)
}

# Checking it works
cl <- makePSOCKcluster(4)
clusterExport(cl, c("X_genes", "Y"))
ols_parallel2(cl) |> head()
               x1
[1,]  0.029403088
[2,]  0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,]  0.066971469
# 4. STOP THE CLUSTER
stopCluster(cl)

The main differences from the previous version of the function are:

  1. We are creating the cluster outside of the function and passing it as an argument.

  2. We are exporting the X_genes and Y variables to the cluster only once, which should also reduce overhead significantly.

  3. Because of the previous step, we are now calling X_genes directly in the main function.

  4. The cluster is stopped outside of the function call (since the function no longer manages the cluster object).

Let’s measure the performance to see how much faster the parallel version is.

library(microbenchmark)

# We need to prepare the cluster before hand
cl <- makePSOCKcluster(4)
clusterExport(cl, c("X_genes", "Y"))

microbenchmark(
  serial = ols_serial(X_genes, Y),
  parallel = ols_parallel2(cl),
  times = 10L,
  check = "identical"
)
Unit: milliseconds
     expr      min       lq     mean   median       uq      max neval
   serial 605.7212 648.7491 686.1701 698.5711 708.9739 783.0273    10
 parallel 290.5768 322.0961 328.8551 330.5771 338.9526 357.1149    10
# We need to stop the cluster
stopCluster(cl)

Now, the parallel version is significantly faster than the serial version. Just using the parallel package (or any other package that can be used for parallel computing) does not guarantee improved performance.

4.5 More examples

The following three examples are a simple application of the package in which we are explicitly running as many replications as threads the cluster has. Generally, the number of replicates will be a function of the data.

4.5.1 Ex 1: Parallel RNG with makePSOCKCluster

Caution

Using more threads than cores available on your computer is never a good idea. As a rule of thumb, clusters should be created using parallel::detectCores() - 1 cores (so you leave one free for the rest of your computer.)

# 1. CREATING A CLUSTER
library(parallel)
nnodes <- 4L
cl     <- makePSOCKcluster(nnodes)    
# 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123) # Equivalent to `set.seed(123)`
# 3. DO YOUR CALL
ans <- parSapply(cl, 1:nnodes, function(x) runif(1e3))
(ans0 <- var(ans))
              [,1]          [,2]          [,3]          [,4]
[1,]  0.0861888293 -0.0001633431  5.939143e-04 -3.672845e-04
[2,] -0.0001633431  0.0853841838  2.390790e-03 -1.462154e-04
[3,]  0.0005939143  0.0023907904  8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06  8.467722e-02

Making sure it is reproducible

# I want to get the same!
clusterSetRNGStream(cl, 123)
ans1 <- var(parSapply(cl, 1:nnodes, function(x) runif(1e3)))
# 4. STOP THE CLUSTER
stopCluster(cl)
all.equal(ans0, ans1) # All equal!
[1] TRUE

4.5.2 Ex 2: Parallel RNG with makeForkCluster

In the case of makeForkCluster

# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
nsims  <- 1e3
nnodes <- 4L
cl     <- makeForkCluster(nnodes)    
# 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123)
# 3. DO YOUR CALL
ans <- do.call(cbind, parLapply(cl, 1:nnodes, function(x) {
  runif(nsims) # Look! we use the nsims object!
               # This would have fail in makePSOCKCluster
               # if we didn't copy -nsims- first.
  }))
(ans0 <- var(ans))
              [,1]          [,2]          [,3]          [,4]
[1,]  0.0861888293 -0.0001633431  5.939143e-04 -3.672845e-04
[2,] -0.0001633431  0.0853841838  2.390790e-03 -1.462154e-04
[3,]  0.0005939143  0.0023907904  8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06  8.467722e-02

Again, we want to make sure this is reproducible

# Same sequence with same seed
clusterSetRNGStream(cl, 123)
ans1 <- var(do.call(cbind, parLapply(cl, 1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # A matrix of zeros
     [,1] [,2] [,3] [,4]
[1,]    0    0    0    0
[2,]    0    0    0    0
[3,]    0    0    0    0
[4,]    0    0    0    0
# 4. STOP THE CLUSTER
stopCluster(cl)

Well, if you are a Mac-OS/Linux user, there’s a more straightforward way of doing this…

4.5.3 Ex 3: Parallel RNG with mclapply (Forking on the fly)

In the case of mclapply, the forking (cluster creation) is done on the fly!

# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
nsims  <- 1e3
nnodes <- 4L
# cl     <- makeForkCluster(nnodes) # mclapply does it on the fly
# 2. PREPARING THE CLUSTER
set.seed(123) 
# 3. DO YOUR CALL
ans <- do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims)))
(ans0 <- var(ans))
             [,1]         [,2]         [,3]         [,4]
[1,]  0.084249882 -0.004402682 -0.002243108  0.003309541
[2,] -0.004402682  0.085804393 -0.001585072 -0.005569005
[3,] -0.002243108 -0.001585072  0.085450519 -0.002044775
[4,]  0.003309541 -0.005569005 -0.002044775  0.082446062

Once more, we want to make sure this is reproducible

# Same sequence with same seed
set.seed(123) 
ans1 <- var(do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # A matrix of zeros
              [,1]         [,2]         [,3]          [,4]
[1,] -0.0004135208 -0.004765671 -0.002965292 -0.0008748349
[2,] -0.0047656708  0.004394996  0.001405680 -0.0032905758
[3,] -0.0029652921  0.001405680  0.001699969  0.0004823830
[4,] -0.0008748349 -0.003290576  0.000482383  0.0034031630
# 4. STOP THE CLUSTER
# stopCluster(cl) no need of doing this anymore

4.6 Exercise: Overhead costs

Compare the timing of taking the sum of 100 numbers when parallelized versus not. For the unparallized (serialized) version, use the following:

set.seed(123)
x <- runif(n=100)

serial_sum <- function(x){
  x_sum <- sum(x)
  return(x_sum)
}

For the parallized version, follow this outline

library(parallel)
set.seed(123)
x <- runif(n=100)

parallel_sum <- function(){
  
  
  # Set number of cores to use
  # make cluster and export to the cluster the x variable
  # Use "split function to divide x up into as many chunks as the number of cores
  
  # Calculate partial sums doing something like:
  
  partial_sums <- parallel::parSapply(cl, x_split, sum)
  
  # Stop the cluster
  
  # Add and return the partial sums
  
}

Compare the timing of the two approaches:

microbenchmark::microbenchmark(
  serial   = serial_sum(x),
  parallel = parallel_sum(x),
  times    = 10,
  unit     = "relative"
)