El Paquete parallel

Modified

August 21, 2025

WarningNota de Traducción

Esta versión del capítulo fue traducida de manera automática utilizando IA. El capítulo aún no ha sido revisado por un humano.

Aunque R no fue construido para computación paralela, existen múltiples formas de paralelizar tu código R. Una de estas es el paquete parallel. Este paquete de R, incluido con R base, proporciona varias funciones para paralelizar código R usando computación embarazosamente paralela, es decir, una estrategia de tipo divide-y-vencerás. La idea básica es iniciar múltiples sesiones de R (usualmente llamadas procesos hijos), conectar la sesión principal con estas, y enviarles instrucciones. Esta sección cubre un flujo de trabajo común para trabajar con el parallel de R.

Flujo de trabajo paralelo

(Usualmente) Hacemos lo siguiente:

  1. Crear un cluster PSOCK/FORK (u otro) usando makePSOCKCluster/makeForkCluster (o makeCluster). Cuántos procesos hijos dependeŕa de cuántos núcleos tiene tu computadora. Una regla general es usar parallel::detectCores() - 1 núcleos (para dejar uno libre para el resto de tu computadora).

  2. Copiar/preparar cada sesión de R (si estás usando un cluster PSOCK):

    1. Copiar objetos con clusterExport. Estos serían todos los objetos que necesitas en las sesiones hijas.

    2. Pasar expresiones con clusterEvalQ. Esto incluiría cargar paquetes de R y otro código en las otras sesiones.

    3. Establecer una semilla (si estás haciendo algo que involucra aleatoriedad)

  3. Hacer tu llamada: parApply, parLapply, etc.

  4. Detener el cluster con clusterStop

Como mencionamos más adelante, el paso 2 dependerá del tipo de cluster que estés usando. Si estás usando una conexión Socket (PSOCK cluster), entonces las sesiones de R generadas serán completamente nuevas (sin datos o paquetes de R precargados); mientras que usar una conexión Fork (FORK cluster) copiará la sesión actual de R, incluyendo todos los objetos y paquetes cargados.

Tipos de clusters: PSOCK

  • Se puede crear con makePSOCKCluster

  • Crea sesiones de R completamente nuevas (por lo que nada se hereda del maestro), ej.

    # Esto crea un cluster con 4 sesiones de R
    cl <- makePSOCKCluster(4)
  • Las sesiones hijas están conectadas a la sesión maestra a través de conexiones Socket

  • Se puede crear fuera de la computadora actual, es decir, ¡entre múltiples computadoras!

Tipos de clusters: Fork

  • Fork Cluster makeForkCluster:

  • Usa Forking del SO,

  • Copia la sesión actual de R localmente (por lo que todo se hereda del maestro hasta ese punto).

  • Los datos solo se duplican si se alteran (¡necesito verificar cuándo sucede esto!)

  • No disponible en Windows.

Otros tipos están disponibles a través de la función makeCluster del paquete de R snow (Simple Network of Workstations). Estos incluyen clusters MPI (Message Passing Interface) y clusters Slurm (Socket).

Un programa plantilla

El siguiente bloque de código muestra una plantilla para usar el paquete parallel en R. Puedes copiar esto y comentar las partes que no necesites:

library(parallel)

# 1. CREAR UN CLUSTER ----------------
nnodes <- 4L # ¡Podría ser menos o más!
cl <- makePSOCKcluster(nnodes)

# 2. PREPARAR EL CLUSTER -------------

# Principalmente si se usa PSOCK
clusterEvalQ(cl, {
  library(...) # Cargar los paquetes necesarios
  source(...) # Cargar scripts adicionales
})

# Siempre si estás generando números aleatorios
clusterSetRNGStream(cl, 123)

# 3. HACER TU LLAMADA ----------------------
ans <- parLapply(
  cl,
  ... lista larga para iterar ...,
  function(x) {
    ...
  },
  ... argumentos adicionales ...
  )

# 4. DETENER EL CLUSTER
stopCluster(cl)

Generalmente, la ... lista larga para iterar ... será un vector u otra lista que contenga datos (ej., conjuntos de datos individuales), una secuencia de números (ej., del 1 al 1000), una lista de rutas de archivos (si estuvieras procesando archivos individualmente), o directamente una secuencia corta con números del 1 al número de nodos (aplicación menos común).

Cuando llamas parLapply o parSapply (las versiones paralelas de lapply y sapply respectivamente), la llamada a la función dividirá automáticamente las iteraciones entre nodos usando la función splitIndices. Aquí hay un ejemplo de lo que sucede bajo el capó:

# Distribuyendo 9 iteraciones entre dos núcleos
(n_iterations <- parallel::splitIndices(nx = 9, ncl = 2))
[[1]]
[1] 1 2 3 4

[[2]]
[1] 5 6 7 8 9

Lo que significa que la primera sesión de R obtendrá 4 trabajos, mientras que la segunda sesión de R obtendrá 5 trabajos. De esta manera, cada sesión de R generada (sesión hija) obtiene un número similar de iteraciones.

Ejemplo: Reemplazar un for-loop con parLapply

Uno de los casos de uso más comunes de la computación paralela es reemplazar un for-loop con una versión paralela. En este ejemplo, llamaremos a runif(10) 1,000 veces, comparando un simple for-loop con parLapply.

Versión serial usando un for-loop

La versión serial itera sobre las simulaciones usando un for-loop:

nsims <- 1000

# Primero, crear semillas para las ejecuciones individuales
set.seed(1231)
seeds <- sample.int(1e7, nsims)

# Versión serial usando un for-loop
result_serial <- vector("list", nsims)
for (i in seq_len(nsims)) {
  set.seed(seeds[i])
  result_serial[[i]] <- runif(10)
}

# Viendo los primeros resultados
head(result_serial, 2)
[[1]]
 [1] 0.38201608 0.30000010 0.03890397 0.51085385 0.63979380 0.26806211
 [7] 0.86383563 0.64311030 0.17286217 0.89551119

[[2]]
 [1] 0.23208922 0.65968122 0.55613334 0.75545780 0.25803644 0.89517280
 [7] 0.90952159 0.69711625 0.02608031 0.60768982

El lector puede notar que pre-generamos un vector con semillas para cada simulación. Si bien esta no es una práctica común en computación serial, como veremos, es una estrategia común para asegurar la reproducibilidad en computación paralela.

Versión paralela usando parLapply

Podemos reemplazar el for-loop con parLapply siguiendo la plantilla que describimos anteriormente. Nótese la estrategia de semillas: en lugar de usar clusterSetRNGStream, estamos pre-generando un vector de semillas y estableciendo la semilla dentro de cada simulación. Esto hace que los resultados sean 100% reproducibles independientemente del número de núcleos/hilos utilizados.

library(parallel)

# 1. CREAR UN CLUSTER ----------------
cl <- makePSOCKcluster(4L)

# 2. PREPARAR EL CLUSTER -------------
# Exportamos el vector de semillas a todos los workers
clusterExport(cl, "seeds")

# 3. HACER TU LLAMADA ----------------------
result_parallel <- parLapply(cl, seq_len(nsims), \(i) {
  # Esto lo hace 100% reproducible, independientemente del
  # número de núcleos/hilos que estemos usando
  set.seed(seeds[i])
  runif(10)
})

# 4. DETENER EL CLUSTER
stopCluster(cl)

# Los resultados son idénticos
all.equal(result_serial, result_parallel)
[1] TRUE
NoteSemillas por simulación vs clusterSetRNGStream

El enfoque utilizado aquí—generar semillas de antemano y llamar a set.seed() dentro de cada simulación—es una alternativa a clusterSetRNGStream. Las diferencias clave son:

  • clusterSetRNGStream inicializa el generador de números aleatorios L’Ecuyer-CMRG a través del cluster. Los flujos dependen del número de workers, por lo que los resultados pueden cambiar si cambias el número de núcleos.

  • Semillas por simulación (como se muestra aquí) asigna una semilla única a cada iteración. Dado que la semilla está ligada al índice de la simulación y no al worker, los resultados son idénticos sin importar cuántos núcleos se utilicen.

Ambos enfoques son válidos. Las semillas por simulación son especialmente útiles cuando quieres resultados que sean invariantes al número de hilos, por ejemplo, al depurar o comparar ejecuciones entre diferentes máquinas. Nótese que combinar ambas estrategias es generalmente redundante: una vez que llamas a set.seed() dentro de cada iteración, el flujo RNG del worker inicializado por clusterSetRNGStream ya no afecta los números generados en esa iteración. Usar clusterSetRNGStream junto con semillas por simulación solo importaría si también dependes de RNG fuera de la llamada set.seed() por iteración, por ejemplo, en código de configuración ejecutado en los workers.

Ejemplo: Ejecutar una regresión lineal a través de múltiples columnas

En genómica, es común analizar datos genómicos a nivel de genes comparando niveles de expresión contra algún fenotipo/enfermedad. Un análisis simple consiste en ejecutar una regresión lineal a través de múltiples columnas (genes) de un data frame. El siguiente bloque de código genera algunos datos artificiales que podemos usar para este ejemplo:

set.seed(331)
n_genes <- 10000
n_obs <- 1000

# Una matriz aleatoria de ómicas
X_genes <- rnorm(n_obs * n_genes) |>
  matrix(nrow = n_obs)

# Un fenotipo aleatorio (completamente no relacionado para este ejemplo)
Y <- rnorm(n_obs) |> cbind()

Envolveremos el análisis en una función para poder hacer benchmarking. Usaremos la función lapply para iterar sobre las columnas de X_genes

ols_serial <- function(X, Y) {
  lapply(
    X = seq_len(n_genes),
    FUN = \(i) {lm.fit(X[, i, drop = FALSE], Y) |> coef()}
  ) |> do.call(what = rbind)
}

# Llamando la función y viendo las primeras filas
ols_serial(X_genes, Y) |> head()
               x1
[1,]  0.029403088
[2,]  0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,]  0.066971469
Tip

Como hicimos en la sección de programación eficiente, en lugar de usar lm() o glm(), podemos usar lm.fit() para mejor rendimiento. La función lm.fit() hace menos que la función lm() al omitir el cálculo de residuos y otras sobrecargas, haciéndola más rápida para conjuntos de datos grandes.

Usando computación paralela (y siguiendo la plantilla que presentamos anteriormente), esto podría hacerse de la siguiente manera con el paquete parallel:

library(parallel)

ols_parallel <- function(X, Y, ncores) {
  # 1. CREAR UN CLUSTER ----------------
  cl <- makePSOCKcluster(ncores)
  
  # Esto será llamado al salir de la función
  on.exit(stopCluster(cl)) 

  # 2. PREPARAR EL CLUSTER -------------
  # Copiamos los datos
  clusterExport(cl, c("X", "Y"), envir = environment())

  # 3. HACER TU LLAMADA ----------------------
  parLapply(
    cl,
    seq_len(n_genes),
    function(i) {
      lm.fit(X[, i, drop = FALSE], Y) |> coef()
    }
  ) |> do.call(what = rbind)
}

# Verificando que funciona
ols_parallel(X_genes, Y, ncores = 4L) |> head()
               x1
[1,]  0.029403088
[2,]  0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,]  0.066971469
Tip

Al igual que return(), on.exit() solo puede usarse dentro de una llamada a función. Podríamos haber usado stopCluster(cl) al final como hacemos en nuestro ejemplo de plantilla, pero el beneficio de usar on.exit() es que será llamado automáticamente cuando la función termine, incluso si ocurre un error. Esto ayuda a asegurar que el cluster siempre se detenga apropiadamente.

Ahora que tenemos la función implementada, podemos proceder a (1) comparar resultados y (2) medir rendimiento.

library(microbenchmark)

microbenchmark(
  serial = ols_serial(X_genes, Y),
  parallel = ols_parallel(X_genes, Y, ncores = 4L),
  times = 10L,
  check = "identical"
)
Unit: milliseconds
     expr       min        lq      mean    median        uq       max neval
   serial  671.8977  682.1324  721.0751  706.7339  767.8394  774.9819    10
 parallel 1734.0423 1761.3043 1769.4946 1765.1871 1779.8051 1826.8767    10

De la comparación, podemos ver que la versión paralela es significativamente más lenta que la versión serial. Dos cosas a notar aquí son (a) la tarea que estamos ejecutando ya es rápida (alrededor de 0.3 segundos en promedio para la ejecución serial) y (b) hay un costo de sobrecarga asociado con crear, preparar y detener el cluster. Como mencionamos anteriormente, las optimizaciones paralelas solo tienen sentido si tu código ya está tomando una cantidad significativa de tiempo, haciendo que el costo de sobrecarga asociado con la configuración sea relativamente pequeño. La siguiente implementación de la función debería hacerla significativamente más rápida:

ols_parallel2 <- function(cl) {
  # 1. CREAR UN CLUSTER ----------------
  # 2. PREPARAR EL CLUSTER -------------
  # Ya no es necesario ya que estamos manejando el núcleo fuera

  # 3. HACER TU LLAMADA ----------------------
  parLapply(
    cl,
    seq_len(n_genes),
    function(i) {
      lm.fit(X_genes[, i, drop = FALSE], Y) |> coef()
    }
  ) |> do.call(what = rbind)
}

# Verificando que funciona
cl <- makePSOCKcluster(4)
clusterExport(cl, c("X_genes", "Y"))
ols_parallel2(cl) |> head()
               x1
[1,]  0.029403088
[2,]  0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,]  0.066971469
# 4. DETENER EL CLUSTER
stopCluster(cl)

Las principales diferencias de la versión anterior de la función son:

  1. Estamos creando el cluster fuera de la función y pasándolo como argumento.

  2. Estamos exportando las variables X_genes y Y al cluster solo una vez, lo cual también debería reducir significativamente la sobrecarga.

  3. Debido al paso anterior, ahora estamos llamando X_genes directamente en la función principal.

  4. El cluster se detiene fuera de la llamada a la función (ya que la función ya no maneja el objeto cluster).

Midamos el rendimiento para ver qué tan más rápida es la versión paralela.

library(microbenchmark)

# Necesitamos preparar el cluster de antemano
cl <- makePSOCKcluster(4)
clusterExport(cl, c("X_genes", "Y"))

microbenchmark(
  serial = ols_serial(X_genes, Y),
  parallel = ols_parallel2(cl),
  times = 10L,
  check = "identical"
)
Unit: milliseconds
     expr      min       lq     mean   median       uq      max neval
   serial 632.1816 671.6570 718.6569 729.3737 768.3108 782.3245    10
 parallel 330.0622 347.9088 357.7508 359.5807 363.9590 390.3031    10
# Necesitamos detener el cluster
stopCluster(cl)

Ahora, la versión paralela es significativamente más rápida que la versión serial. Solo usar el paquete parallel (o cualquier otro paquete que se pueda usar para computación paralela) no garantiza un rendimiento mejorado.

Más ejemplos

Los siguientes tres ejemplos son una aplicación simple del paquete en la cual estamos ejecutando explícitamente tantas réplicas como hilos tiene el cluster. Generalmente, el número de réplicas será una función de los datos.

Ej 1: RNG Paralelo con makePSOCKCluster

Caution

Usar más hilos que núcleos disponibles en tu computadora nunca es una buena idea. Como regla general, los clusters deberían crearse usando parallel::detectCores() - 1 núcleos (para dejar uno libre para el resto de tu computadora.)

# 1. CREAR UN CLUSTER
library(parallel)
nnodes <- 4L
cl     <- makePSOCKcluster(nnodes)    
# 2. PREPARAR EL CLUSTER
clusterSetRNGStream(cl, 123) # Equivalente a `set.seed(123)`
# 3. HACER TU LLAMADA
ans <- parSapply(cl, 1:nnodes, function(x) runif(1e3))
(ans0 <- var(ans))
              [,1]          [,2]          [,3]          [,4]
[1,]  0.0861888293 -0.0001633431  5.939143e-04 -3.672845e-04
[2,] -0.0001633431  0.0853841838  2.390790e-03 -1.462154e-04
[3,]  0.0005939143  0.0023907904  8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06  8.467722e-02

Asegurándonos de que es reproducible

# ¡Quiero obtener lo mismo!
clusterSetRNGStream(cl, 123)
ans1 <- var(parSapply(cl, 1:nnodes, function(x) runif(1e3)))
# 4. DETENER EL CLUSTER
stopCluster(cl)
all.equal(ans0, ans1) # ¡Todo igual!
[1] TRUE

Ej 2: RNG Paralelo con makeForkCluster

En el caso de makeForkCluster

# 1. CREAR UN CLUSTER
library(parallel)
# El fork cluster copiará el objeto -nsims-
nsims  <- 1e3
nnodes <- 4L
cl     <- makeForkCluster(nnodes)    
# 2. PREPARAR EL CLUSTER
clusterSetRNGStream(cl, 123)
# 3. HACER TU LLAMADA
ans <- do.call(cbind, parLapply(cl, 1:nnodes, function(x) {
  runif(nsims) # ¡Mira! usamos el objeto nsims!
               # Esto habría fallado en makePSOCKCluster
               # si no copiamos -nsims- primero.
  }))
(ans0 <- var(ans))
              [,1]          [,2]          [,3]          [,4]
[1,]  0.0861888293 -0.0001633431  5.939143e-04 -3.672845e-04
[2,] -0.0001633431  0.0853841838  2.390790e-03 -1.462154e-04
[3,]  0.0005939143  0.0023907904  8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06  8.467722e-02

Nuevamente, queremos asegurarnos de que esto es reproducible

# Misma secuencia con misma semilla
clusterSetRNGStream(cl, 123)
ans1 <- var(do.call(cbind, parLapply(cl, 1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # Una matriz de ceros
     [,1] [,2] [,3] [,4]
[1,]    0    0    0    0
[2,]    0    0    0    0
[3,]    0    0    0    0
[4,]    0    0    0    0
# 4. DETENER EL CLUSTER
stopCluster(cl)

Bueno, si eres usuario de Mac-OS/Linux, hay una forma más directa de hacer esto…

Ej 3: RNG Paralelo con mclapply (Forking sobre la marcha)

En el caso de mclapply, ¡el forking (creación del cluster) se hace sobre la marcha!

# 1. CREAR UN CLUSTER
library(parallel)
# El fork cluster copiará el objeto -nsims-
nsims  <- 1e3
nnodes <- 4L
# cl     <- makeForkCluster(nnodes) # mclapply lo hace sobre la marcha
# 2. PREPARAR EL CLUSTER
set.seed(123) 
# 3. HACER TU LLAMADA
ans <- do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims)))
(ans0 <- var(ans))
              [,1]         [,2]         [,3]         [,4]
[1,]  0.0835318177 5.511648e-04 -0.001941182 1.275908e-03
[2,]  0.0005511648 8.324948e-02  0.001510458 6.899196e-05
[3,] -0.0019411822 1.510458e-03  0.082569008 2.303661e-03
[4,]  0.0012759076 6.899196e-05  0.002303661 8.244617e-02

Una vez más, queremos asegurarnos de que esto es reproducible

# Misma secuencia con misma semilla
set.seed(123) 
ans1 <- var(do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # Una matriz de ceros
              [,1]          [,2]          [,3]          [,4]
[1,]  0.0017232729 -7.241822e-04 -0.0023348671  0.0008057264
[2,] -0.0007241822 -5.355761e-06 -0.0035963591 -0.0042906102
[3,] -0.0023348671 -3.596359e-03 -0.0017425983 -0.0005832715
[4,]  0.0008057264 -4.290610e-03 -0.0005832715  0.0004458054
# 4. DETENER EL CLUSTER
# stopCluster(cl) ya no es necesario hacer esto

Ejercicio: Costos de sobrecarga

Compara el tiempo de tomar la suma de 100 números cuando está paralelizado versus no. Para la versión no paralelizada (serializada), usa lo siguiente:

set.seed(123)
x <- runif(n=100)

serial_sum <- function(x){
  x_sum <- sum(x)
  return(x_sum)
}

Para la versión paralelizada, sigue este esquema

library(parallel)
set.seed(123)
x <- runif(n=100)

parallel_sum <- function(){
  
  
  # Establecer número de núcleos a usar
  # hacer cluster y exportar al cluster la variable x
  # Usar "función de división para dividir x en tantos trozos como el número de núcleos
  
  # Calcular sumas parciales haciendo algo como:
  
  partial_sums <- parallel::parSapply(cl, x_split, sum)
  
  # Detener el cluster
  
  # Sumar y devolver las sumas parciales
  
}

Compara el tiempo de los dos enfoques:

microbenchmark::microbenchmark(
  serial   = serial_sum(x),
  parallel = parallel_sum(x),
  times    = 10,
  unit     = "relative"
)