# Distributing 9 iterations across two cores
<- parallel::splitIndices(nx = 9, ncl = 2)) (n_iterations
[[1]]
[1] 1 2 3 4
[[2]]
[1] 5 6 7 8 9
Although R was not built for parallel computing, multiple ways of parallelizing your R code exist. One of these is the parallel
package. This R package, shipped with base R, provides various functions to parallelize R code using embarrassingly parallel computing, i.e., a divide-and-conquer-type strategy. The basic idea is to start multiple R sessions (usually called child processes), connect the main session with those, and send them instructions. This section goes over a common workflow to work with R’s parallel.
(Usually) We do the following:
Create a PSOCK/FORK
(or other) cluster using makePSOCKCluster
/makeForkCluster
(or makeCluster
). How many child processes will depend on how many threads your computer has. A rule of thumb is to use parallel::detectCores() - 1
cores (so you leave one free for the rest of your computer).
Copy/prepare each R session (if you are using a PSOCK
cluster):
Copy objects with clusterExport
. This would be all the objects that you need in the child sessions.
Pass expressions with clusterEvalQ
. This would include loading R packages and other code into the other sessions.
Set a seed (if you are doing something that involves randomness)
Do your call: parApply
, parLapply
, etc.
Stop the cluster with clusterStop
As we mention later, step 2 will depend on the type of cluster you are using. If you are using a Socket connection (PSOCK
cluster), then the spawned R sessions will be completely, fresh (no data or R packages pre-loaded); whereas using a Fork connection (FORK
cluster) will copy the current R session, including all objects and loaded packages.
Can be created with makePSOCKCluster
Creates brand new R Sessions (so nothing is inherited from the master), e.g.
# This creates a cluster with 4 R sessions
<- makePSOCKCluster(4) cl
Child sessions are connected to the master session via Socket connections
Can be created outside the current computer, i.e., across multiple computers!
Fork Cluster makeForkCluster
:
Uses OS Forking,
Copies the current R session locally (so everything is inherited from the master up to that point).
Data is only duplicated if altered (need to double check when this happens!)
Not available on Windows.
Other types are available via the function makeCluster
from the snow R package (Simple Network of Workstations). These include MPI (Message Passing Interface) clusters and Slurm (Socket) clusters.
The following code chunk shows a template for using the parallel
package in R. You can copy this and comment the bits that you don’t need:
library(parallel)
# 1. CREATING A CLUSTER ----------------
<- 4L # Could be less or more!
nnodes <- makePSOCKcluster(nnodes)
cl
# 2. PREPARING THE CLUSTER -------------
# Mostly if using PSOCK
clusterEvalQ(cl, {
library(...) # Loading the necesary packages
source(...) # Source additional scripts
})
# Always if you are doing random numbers
clusterSetRNGStream(cl, 123)
# 3. DO YOUR CALL ----------------------
<- parLapply(
ans
cl,
... long list to iterate ...,function(x) {
...
},
... further arguments ...
)
# 4. STOP THE CLUSTER
stopCluster(cl)
Generally, the ... long list to iterate ...
will be a vector or another list that contains either data (e.g., individual datasets), a sequence of numbers (e.g., from 1 to 1000), a list of file paths (if you were processing files individually), or directly a short sequence with numbers from 1 to the number of nodes (least common application).
When calling parLapply
or parSapply
(the parallel versions of lapply
and sapply
respectively), the function call will automatically split the iterations across nodes using the splitIndices
function. Here is an example of what happens under the hood:
# Distributing 9 iterations across two cores
<- parallel::splitIndices(nx = 9, ncl = 2)) (n_iterations
[[1]]
[1] 1 2 3 4
[[2]]
[1] 5 6 7 8 9
Which means that the first R session will get 4 jobs, wereas the second R session will get 5 jobs. This way, each spawned R session (child session) gets to do a similiar number of iterations.
In genomics, it is common to analyze genomic data at the gene level comparing expression levels against some phenotype/disease. A simple analysis consists of running a linear regression across multiple columns (genes) of a data frame. The following code-block generates some artificial data we can use for this example:
set.seed(331)
<- 10000
n_genes <- 1000
n_obs
# A random matrix of omics
<- rnorm(n_obs * n_genes) |>
X_genes matrix(nrow = n_obs)
# A random phenotype (completely unrelated for this example)
<- rnorm(n_obs) |> cbind() Y
We will wrap the analysis into a function so we can do benchmarking. We will use the lapply
function to iterate over the columns of X_genes
<- function(X, Y) {
ols_serial lapply(
X = seq_len(n_genes),
FUN = \(i) {lm.fit(X[, i, drop = FALSE], Y) |> coef()}
|> do.call(what = rbind)
)
}
# Calling the function and looking at the first few rows
ols_serial(X_genes, Y) |> head()
x1
[1,] 0.029403088
[2,] 0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,] 0.066971469
Like we did in the efficient programming section, instead of using lm()
or glm()
, we can use lm.fit()
for better performance. The lm.fit()
function does less than the lm()
function by skipping computing residuals and other overhead, making it faster for large datasets.
Using parallel computing (and following the template we presented earlier), this could be done in the following way with the parallel package:
library(parallel)
<- function(X, Y, ncores) {
ols_parallel # 1. CREATING A CLUSTER ----------------
<- makePSOCKcluster(ncores)
cl
# This will be called when exiting the function
on.exit(stopCluster(cl))
# 2. PREPARING THE CLUSTER -------------
# We copy the data over
clusterExport(cl, c("X", "Y"), envir = environment())
# 3. DO YOUR CALL ----------------------
parLapply(
cl,seq_len(n_genes),
function(i) {
lm.fit(X[, i, drop = FALSE], Y) |> coef()
}|> do.call(what = rbind)
)
}
# Checking it works
ols_parallel(X_genes, Y, ncores = 4L) |> head()
x1
[1,] 0.029403088
[2,] 0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,] 0.066971469
Just like the return()
, on.exit()
can only be used within a function call. We could have used stopCluster(cl)
at the end as we do in our template example, but the benefit of using on.exit()
is that it will be called automatically when the function exits, even if an error occurs. This helps to ensure that the cluster is always stopped properly.
Now that we have the function implemented, we can go ahead and (1) compare results and (2) measure performance.
library(microbenchmark)
microbenchmark(
serial = ols_serial(X_genes, Y),
parallel = ols_parallel(X_genes, Y, ncores = 4L),
times = 10L,
check = "identical"
)
Unit: milliseconds
expr min lq mean median uq max neval
serial 600.3555 612.506 699.1705 700.4057 774.8037 804.8882 10
parallel 1870.8310 1875.149 1884.7327 1887.2280 1890.6226 1896.3960 10
From the comparison, we can see that the parallel version is significantly slower than the serial version. Two things to note here are (a) the task we are running is already fast (about 0.3 seconds on average for the serial run) and (b) there is an overhead cost associated with creating, preparing, and stopping the cluster. As we mentioned earlier, parallel optimizations only make sense if your code is already talking a significant amount of time, making the overhead cost associated with the setup relatively small. The following implementation of the function should make it significantly faster:
<- function(cl) {
ols_parallel2 # 1. CREATING A CLUSTER ----------------
# 2. PREPARING THE CLUSTER -------------
# No need anymore as we are handling the core outside
# 3. DO YOUR CALL ----------------------
parLapply(
cl,seq_len(n_genes),
function(i) {
lm.fit(X_genes[, i, drop = FALSE], Y) |> coef()
}|> do.call(what = rbind)
)
}
# Checking it works
<- makePSOCKcluster(4)
cl clusterExport(cl, c("X_genes", "Y"))
ols_parallel2(cl) |> head()
x1
[1,] 0.029403088
[2,] 0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,] 0.066971469
# 4. STOP THE CLUSTER
stopCluster(cl)
The main differences from the previous version of the function are:
We are creating the cluster outside of the function and passing it as an argument.
We are exporting the X_genes
and Y
variables to the cluster only once, which should also reduce overhead significantly.
Because of the previous step, we are now calling X_genes
directly in the main function.
The cluster is stopped outside of the function call (since the function no longer manages the cluster object).
Let’s measure the performance to see how much faster the parallel version is.
library(microbenchmark)
# We need to prepare the cluster before hand
<- makePSOCKcluster(4)
cl clusterExport(cl, c("X_genes", "Y"))
microbenchmark(
serial = ols_serial(X_genes, Y),
parallel = ols_parallel2(cl),
times = 10L,
check = "identical"
)
Unit: milliseconds
expr min lq mean median uq max neval
serial 605.7212 648.7491 686.1701 698.5711 708.9739 783.0273 10
parallel 290.5768 322.0961 328.8551 330.5771 338.9526 357.1149 10
# We need to stop the cluster
stopCluster(cl)
Now, the parallel version is significantly faster than the serial version. Just using the parallel package (or any other package that can be used for parallel computing) does not guarantee improved performance.
The following three examples are a simple application of the package in which we are explicitly running as many replications as threads the cluster has. Generally, the number of replicates will be a function of the data.
makePSOCKCluster
Using more threads than cores available on your computer is never a good idea. As a rule of thumb, clusters should be created using parallel::detectCores() - 1
cores (so you leave one free for the rest of your computer.)
# 1. CREATING A CLUSTER
library(parallel)
<- 4L
nnodes <- makePSOCKcluster(nnodes)
cl # 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123) # Equivalent to `set.seed(123)`
# 3. DO YOUR CALL
<- parSapply(cl, 1:nnodes, function(x) runif(1e3))
ans <- var(ans)) (ans0
[,1] [,2] [,3] [,4]
[1,] 0.0861888293 -0.0001633431 5.939143e-04 -3.672845e-04
[2,] -0.0001633431 0.0853841838 2.390790e-03 -1.462154e-04
[3,] 0.0005939143 0.0023907904 8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06 8.467722e-02
Making sure it is reproducible
# I want to get the same!
clusterSetRNGStream(cl, 123)
<- var(parSapply(cl, 1:nnodes, function(x) runif(1e3)))
ans1 # 4. STOP THE CLUSTER
stopCluster(cl)
all.equal(ans0, ans1) # All equal!
[1] TRUE
makeForkCluster
In the case of makeForkCluster
# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
<- 1e3
nsims <- 4L
nnodes <- makeForkCluster(nnodes)
cl # 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123)
# 3. DO YOUR CALL
<- do.call(cbind, parLapply(cl, 1:nnodes, function(x) {
ans runif(nsims) # Look! we use the nsims object!
# This would have fail in makePSOCKCluster
# if we didn't copy -nsims- first.
}))<- var(ans)) (ans0
[,1] [,2] [,3] [,4]
[1,] 0.0861888293 -0.0001633431 5.939143e-04 -3.672845e-04
[2,] -0.0001633431 0.0853841838 2.390790e-03 -1.462154e-04
[3,] 0.0005939143 0.0023907904 8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06 8.467722e-02
Again, we want to make sure this is reproducible
# Same sequence with same seed
clusterSetRNGStream(cl, 123)
<- var(do.call(cbind, parLapply(cl, 1:nnodes, function(x) runif(nsims))))
ans1 - ans1 # A matrix of zeros ans0
[,1] [,2] [,3] [,4]
[1,] 0 0 0 0
[2,] 0 0 0 0
[3,] 0 0 0 0
[4,] 0 0 0 0
# 4. STOP THE CLUSTER
stopCluster(cl)
mclapply
(Forking on the fly)In the case of mclapply
, the forking (cluster creation) is done on the fly!
# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
<- 1e3
nsims <- 4L
nnodes # cl <- makeForkCluster(nnodes) # mclapply does it on the fly
# 2. PREPARING THE CLUSTER
set.seed(123)
# 3. DO YOUR CALL
<- do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims)))
ans <- var(ans)) (ans0
[,1] [,2] [,3] [,4]
[1,] 0.084249882 -0.004402682 -0.002243108 0.003309541
[2,] -0.004402682 0.085804393 -0.001585072 -0.005569005
[3,] -0.002243108 -0.001585072 0.085450519 -0.002044775
[4,] 0.003309541 -0.005569005 -0.002044775 0.082446062
Once more, we want to make sure this is reproducible
# Same sequence with same seed
set.seed(123)
<- var(do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims))))
ans1 - ans1 # A matrix of zeros ans0
[,1] [,2] [,3] [,4]
[1,] -0.0004135208 -0.004765671 -0.002965292 -0.0008748349
[2,] -0.0047656708 0.004394996 0.001405680 -0.0032905758
[3,] -0.0029652921 0.001405680 0.001699969 0.0004823830
[4,] -0.0008748349 -0.003290576 0.000482383 0.0034031630
# 4. STOP THE CLUSTER
# stopCluster(cl) no need of doing this anymore
Compare the timing of taking the sum of 100 numbers when parallelized versus not. For the unparallized (serialized) version, use the following:
set.seed(123)
<- runif(n=100)
x
<- function(x){
serial_sum <- sum(x)
x_sum return(x_sum)
}
For the parallized version, follow this outline
library(parallel)
set.seed(123)
<- runif(n=100)
x
<- function(){
parallel_sum
# Set number of cores to use
# make cluster and export to the cluster the x variable
# Use "split function to divide x up into as many chunks as the number of cores
# Calculate partial sums doing something like:
<- parallel::parSapply(cl, x_split, sum)
partial_sums
# Stop the cluster
# Add and return the partial sums
}
Compare the timing of the two approaches:
::microbenchmark(
microbenchmarkserial = serial_sum(x),
parallel = parallel_sum(x),
times = 10,
unit = "relative"
)