5.4 Cómputo en paralelo usando CPUs en un sistema de memoria compartida (SMC)#

Notas para contenedor de docker:

Comando de docker para ejecución de la nota de forma local:

nota: cambiar <ruta a mi directorio> por la ruta de directorio que se desea mapear a /datos dentro del contenedor de docker y <versión imagen de docker> por la versión más actualizada que se presenta en la documentación.

docker run --rm -v <ruta a mi directorio>:/datos --name jupyterlab_optimizacion_2 -p 8888:8888 -p 8787:8787 -d palmoreck/jupyterlab_optimizacion_2:<versión imagen de docker>

password para jupyterlab: qwerty

Detener el contenedor de docker:

docker stop jupyterlab_optimizacion_2

Documentación de la imagen de docker palmoreck/jupyterlab_optimizacion_2:<versión imagen de docker> en liga.


Nota generada a partir de liga1, liga2, liga3

Al final de esta nota la comunidad lectora:

  • Aprenderá a usar herramientas de software para cómputo en paralelo en C, Python y R.

  • Aprenderá características que tienen los sistemas de memoria compartida para la ejecución de códigos con cómputo en paralelo.

Se presentan códigos y sus ejecuciones en una máquina m4.16xlarge con una AMI ubuntu 20.04 - ami-042e8287309f5df03 de la nube de AWS. Se utilizó en la sección de User data el script_profiling_and_BLAS.sh

La máquina m4.16xlarge tiene las siguientes características:

%%bash
lscpu
Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   46 bits physical, 48 bits virtual
CPU(s):                          64
On-line CPU(s) list:             0-63
Thread(s) per core:              2
Core(s) per socket:              16
Socket(s):                       2
NUMA node(s):                    2
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           79
Model name:                      Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
Stepping:                        1
CPU MHz:                         2213.083
CPU max MHz:                     3000.0000
CPU min MHz:                     1200.0000
BogoMIPS:                        4600.00
Hypervisor vendor:               Xen
Virtualization type:             full
L1d cache:                       1 MiB
L1i cache:                       1 MiB
L2 cache:                        8 MiB
L3 cache:                        90 MiB
NUMA node0 CPU(s):               0-15,32-47
NUMA node1 CPU(s):               16-31,48-63
Vulnerability Itlb multihit:     KVM: Vulnerable
Vulnerability L1tf:              Mitigation; PTE Inversion
Vulnerability Mds:               Vulnerable: Clear CPU buffers attempted, no microcode; SMT Host state unknown
Vulnerability Meltdown:          Mitigation; PTI
Vulnerability Spec store bypass: Vulnerable
Vulnerability Spectre v1:        Mitigation; usercopy/swapgs barriers and __user pointer sanitization
Vulnerability Spectre v2:        Mitigation; Full generic retpoline, STIBP disabled, RSB filling
Vulnerability Srbds:             Not affected
Vulnerability Tsx async abort:   Vulnerable: Clear CPU buffers attempted, no microcode; SMT Host state unknown
Flags:                           fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq monitor est ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm cpuid_fault invpcid_single pti fsgsbase bmi1 hle avx2 smep bmi2 erms invpcid rtm xsaveopt ida
%%bash
sudo lshw -C memory
  *-firmware
       description: BIOS
       vendor: Xen
       physical id: 0
       version: 4.11.amazon
       date: 08/24/2006
       size: 96KiB
       capabilities: pci edd
  *-memory
       description: System Memory
       physical id: 1000
       size: 256GiB
       capabilities: ecc
       configuration: errordetection=multi-bit-ecc
     *-bank:0
          description: DIMM RAM
          physical id: 0
          slot: DIMM 0
          size: 16GiB
          width: 64 bits
     *-bank:1
          description: DIMM RAM
          physical id: 1
          slot: DIMM 1
          size: 16GiB
          width: 64 bits
     *-bank:2
          description: DIMM RAM
          physical id: 2
          slot: DIMM 2
          size: 16GiB
          width: 64 bits
     *-bank:3
          description: DIMM RAM
          physical id: 3
          slot: DIMM 3
          size: 16GiB
          width: 64 bits
     *-bank:4
          description: DIMM RAM
          physical id: 4
          slot: DIMM 4
          size: 16GiB
          width: 64 bits
     *-bank:5
          description: DIMM RAM
          physical id: 5
          slot: DIMM 5
          size: 16GiB
          width: 64 bits
     *-bank:6
          description: DIMM RAM
          physical id: 6
          slot: DIMM 6
          size: 16GiB
          width: 64 bits
     *-bank:7
          description: DIMM RAM
          physical id: 7
          slot: DIMM 7
          size: 16GiB
          width: 64 bits
     *-bank:8
          description: DIMM RAM
          physical id: 8
          slot: DIMM 8
          size: 16GiB
          width: 64 bits
     *-bank:9
          description: DIMM RAM
          physical id: 9
          slot: DIMM 9
          size: 16GiB
          width: 64 bits
     *-bank:10
          description: DIMM RAM
          physical id: a
          slot: DIMM 10
          size: 16GiB
          width: 64 bits
     *-bank:11
          description: DIMM RAM
          physical id: b
          slot: DIMM 11
          size: 16GiB
          width: 64 bits
     *-bank:12
          description: DIMM RAM
          physical id: c
          slot: DIMM 12
          size: 16GiB
          width: 64 bits
     *-bank:13
          description: DIMM RAM
          physical id: d
          slot: DIMM 13
          size: 16GiB
          width: 64 bits
     *-bank:14
          description: DIMM RAM
          physical id: e
          slot: DIMM 14
          size: 16GiB
          width: 64 bits
     *-bank:15
          description: DIMM RAM
          physical id: f
          slot: DIMM 15
          size: 16GiB
          width: 64 bits
%%bash
uname -ar #r for kernel, a for all
Linux ip-10-0-0-84 5.4.0-1038-aws #40-Ubuntu SMP Fri Feb 5 23:50:40 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

Observación

En la celda anterior se utilizó el comando de magic %%bash. Algunos comandos de magic los podemos utilizar también con import. Ver ipython-magics

Sistemas de memoria compartida (SMC)#

Un SMC en general se ve como el siguiente dibujo:

El dibujo anterior es un SMC con acceso uniforme a la memoria (UMA), esto es, cada proceso o thread creado en el procesador o core accede con las mismas velocidades a la memoria.

La comunicación en este tipo de sistemas depende si se utilizan procesos o threads ya que aunque los procesos generados por un proceso principal tienen acceso a la memoria, los cambios/actualizaciones que haga uno de ellos a una variable no lo llegan a ver los otros procesos. Esto es distinto con los threads dado que un cambio que realice un thread en una variable sí lo pueden ver los otros threads. Lo anterior se debe a las distintas direcciones de memoria que utilizan los procesos vs la misma dirección de memoria que utilizan los threads.

Recuérdese que se crean threads a partir de un proceso principal, realizan fork, y una vez que terminan su ejecución se unen nuevamente, realizan join, esto es el threading.

Ver Threading o Hyperthreading.

Observación

Hay paqueterías que crean subprocesos a partir de un proceso principal en lugar de threads. En este caso en lugar de un fork se realiza un spawn. Los subprocesos pueden ver cambios a variables hechos por otros.

Ver Contexts and start methods y difference between threadpool vs pool in python multiprocessing para diferencias entre fork y spawn.

Definición

Las variables que pueden ser accesadas por todos los threads en un SMC se les nombra variables compartidas.

Trabajar sobre SMC tiene ventajas y desventajas. Una ventaja es facilidad de comunicación y una desventaja es la coordinación para ejecutar instrucciones. Por ejemplo, con variables compartidas se puede realizar la comunicación entre los threads, sin embargo, para el uso de tales variables por diferentes threads debemos crear candados, locks. Lo anterior surge pues si un thread con etiqueta \(1\) accede a una variable compartida, otro thread con etiqueta \(2\), no podrá utilizarla hasta que el thread \(1\) finalice de usarla. Ver Thread Safety y Race Condition.

Definición

En una race condition múltiples threads intentan acceder a un recurso compartido, al menos uno de los accesos resulta en una modificación al recurso y posteriormente los siguientes accesos pueden no ver la modificación. A la sección del código que causa la race condition se le nombra critical section. Las critical sections se ejecutan con código secuencial o con locks para evitar las race conditions.

Comentarios

  • Para crear procesos o threads en los lenguajes de programación de C, Python o R, utilizamos las librerías, API’s o extensiones vía paquetes a tales lenguajes. Lo anterior se debe a que C, Python y R en sus implementaciones más utilizadas o estándar, fueron diseñados con el propósito de utilizarse sobre sistemas con un sólo procesador. En distintas implementaciones de los lenguajes hay soporte para SMC.

  • Ejemplos de máquinas con SMC son las laptops, los celulares, máquinas de escritorio con más de un core.

  • Otro tipo de SMC se puede representar con el siguiente dibujo:

en el que los procesos o threads pueden acceder a la memoria en una forma no uniforme (NUMA). Una de las diferencias que se tienen entre un NUMA y un UMA es la tasa de transferencia de datos para cores que están más cercanos a una memoria.

  • Un sistema de memoria distribuida (SMD) tiene una conexión, por ejemplo vía una network, entre pares de core-memoria y en general se ve como el siguiente dibujo:

La memoria en un SMD asociada al core sólo puede ser accesada por éste y es inaccesible a los demás cores, esto es, se tiene una memoria “privada”. Lo anterior contrasta con los SMC en los que todos los cores accesan a una memoria compartida. En un SMD el cómputo distribuido crea múltiples procesos a partir de un proceso principal. En un SMC el cómputo paralelo crea múltiples procesos o threads. Un ejemplo de un SMD es un clúster de máquinas. Cada máquina en el clúster puede ser un SMC y por tanto los procesos en cada máquina tienen la capacidad de crear procesos o threads. En este caso se tiene un sistema híbrido SMD y SMC. Un programa diseñado para ejecutarse en un SMD puede ejecutarse en un SMC pues se divide su memoria de forma lógica en espacios de memoria privados para los threads.

OpenMP#

Es una extensión al lenguaje C y es una API para cómputo en paralelo en un SMC, aka, shared memory parallel programming con CPUs. Lo anterior OpenMP lo posibilita con el threading.

  • Las siglas MP se refieren a multiprocessing, un sinónimo de shared memory parallel computing. OpenMP se utiliza en un SMC por lo que cada thread tiene acceso a la memoria.

  • Algunas características de OpenMP son:

    • Paralelización de ciclos for secuenciales en los que las iteraciones son independientes una de la otra de forma simple.

    • Paralelización de tareas y sincronización explícita de threads.

  • Provee directivas vía #pragma omp para el cómputo en paralelo en un SMC.

Comentario

Los pragma se utilizan para extender la funcionalidad de C pues no son parte de su especificación básica. Las versiones más recientes del compilador gcc soportan a los pragma y todas las preprocessor directives son por default de longitud una línea.

Ver pragmas.

Directiva parallel#

Ejemplo#

hello_world_omp.c

%%file hello_world_omp.c
#include<stdio.h>
#include<stdlib.h>
#include<omp.h> 

void Hello(void);
int main(){

    #pragma omp parallel
        Hello();
    
    return 0;
}

void Hello(void){
    int my_rank = omp_get_thread_num(); 
    int num_th = omp_get_num_threads(); 
    printf("Hola del thread: %d de %d\n", my_rank, num_th);
}
Writing hello_world_omp.c
%%bash
gcc -Wall -fopenmp hello_world_omp.c -o hello_world_omp.out
%%bash
./hello_world_omp.out
Hola del thread: 60 de 64
Hola del thread: 9 de 64
Hola del thread: 6 de 64
Hola del thread: 13 de 64
Hola del thread: 44 de 64
Hola del thread: 15 de 64
Hola del thread: 12 de 64
Hola del thread: 16 de 64
Hola del thread: 18 de 64
Hola del thread: 14 de 64
Hola del thread: 62 de 64
Hola del thread: 17 de 64
Hola del thread: 45 de 64
Hola del thread: 48 de 64
Hola del thread: 23 de 64
Hola del thread: 55 de 64
Hola del thread: 24 de 64
Hola del thread: 42 de 64
Hola del thread: 26 de 64
Hola del thread: 49 de 64
Hola del thread: 0 de 64
Hola del thread: 29 de 64
Hola del thread: 50 de 64
Hola del thread: 34 de 64
Hola del thread: 53 de 64
Hola del thread: 47 de 64
Hola del thread: 19 de 64
Hola del thread: 39 de 64
Hola del thread: 38 de 64
Hola del thread: 36 de 64
Hola del thread: 33 de 64
Hola del thread: 37 de 64
Hola del thread: 43 de 64
Hola del thread: 59 de 64
Hola del thread: 28 de 64
Hola del thread: 21 de 64
Hola del thread: 46 de 64
Hola del thread: 27 de 64
Hola del thread: 25 de 64
Hola del thread: 31 de 64
Hola del thread: 32 de 64
Hola del thread: 41 de 64
Hola del thread: 20 de 64
Hola del thread: 22 de 64
Hola del thread: 35 de 64
Hola del thread: 58 de 64
Hola del thread: 3 de 64
Hola del thread: 4 de 64
Hola del thread: 2 de 64
Hola del thread: 1 de 64
Hola del thread: 5 de 64
Hola del thread: 7 de 64
Hola del thread: 61 de 64
Hola del thread: 10 de 64
Hola del thread: 8 de 64
Hola del thread: 11 de 64
Hola del thread: 51 de 64
Hola del thread: 40 de 64
Hola del thread: 63 de 64
Hola del thread: 30 de 64
Hola del thread: 57 de 64
Hola del thread: 52 de 64
Hola del thread: 54 de 64
Hola del thread: 56 de 64

Comentarios

  • omp.h es un header file con prototipos y definiciones de macros para uso de la librería de funciones y macros de OpenMP.

  • La función Hello será ejecutada por los threads.

  • La función omp_get_thread_num da el rank asignado por el run time system a cada thread.

  • Con la función omp_get_num_threads se obtiene el número de threads que realizaron un fork del thread principal.

  • Obsérvese que para la compilación se utilizó la flag -fopenmp para soporte de OpenMP.

  • Dependiendo del número de cores de nuestro sistema tendremos diferentes número de printf’s.

  • Lo que continúa a la línea de #pragma omp parallel es un structured block, esto es, un statement o conjunto de statements que tienen un punto de entrada y un punto de salida. En el caso anterior sólo se llama a la función Hello.

  • No se permiten statements como el siguiente:

#pragma omp parallel

if(...) break;

ni tampoco:

#pragma omp parallel

    {
        if(variable == valor) return 1;
        return -1;
    }

A continuación de la directiva parallel podemos usar diferentes tipos de clauses. Una clause en OpenMP es un texto que modifica una directiva. Por ejemplo, podemos usar la clause num_threads para especificar el número de threads que ejecutarán el structured block.

num_threads clause#

Ejemplo#

hello_world_omp_num_threads.c

%%file hello_world_omp_num_threads.c
#include<stdio.h>
#include<stdlib.h>
#include<omp.h> 

void Hello(void); 
int main(){
    int n_threads = 5;
    #pragma omp parallel num_threads(n_threads) 
        Hello();
    
    return 0;
}

void Hello(void){
    int my_rank = omp_get_thread_num(); 
    int num_th = omp_get_num_threads(); 
    printf("Hola del thread: %d de %d\n", my_rank, num_th);
}
Writing hello_world_omp_num_threads.c
%%bash
gcc -Wall -fopenmp hello_world_omp_num_threads.c -o hello_world_omp_num_threads.out
%%bash
./hello_world_omp_num_threads.out
Hola del thread: 0 de 5
Hola del thread: 4 de 5
Hola del thread: 1 de 5
Hola del thread: 3 de 5
Hola del thread: 2 de 5

Reduction clause#

OpenMP provee la reduction clause para aplicar la operación de suma (operador binario) a cada resultado calculado de un thread de forma repetida y almacenar en una variable la respuesta.

Nombramos reduction variable a la variable que almacenará los resultados intermedios calculados por cada thread y reduction operator a la operación binaria (por ejemplo una suma o multiplicación) que se aplica repetidamente a una secuencia de operandos para obtener un resultado, en un proceso que se le nombra reduction.

Por ejemplo, si A es un arreglo de n enteros, el cálculo:

int sum = 0;
for(i=0;i<n;i++)
    sum += A[i];

es un proceso de reduction en el que el reduction operator es la suma.

En openMP utilizamos la reduction clause en la parallel directive.

sum_shared = 0.0
#pragma omp parallel num_threads(n_threads) reduction(+: sum_shared)
    sum_shared += Rcf_parallel(a,h_hat,n_subintervals_per_thread);

Con la reduction clause openMP crea una variable privada por cada thread y el run time system almacena el resultado de cada thread en esta variable. openMP también crea una critical section y los valores almacenados en las variables privadas son sumadas en esta critical section y almacenados en la reduction variable sum_shared. El reduction operator es: +.

La sintaxis de la reduction clause es:

reduction( <operator>: <variable list>)

El reduction operator puede ser cualquiera de los operadores: +,*,-,&,|,^,&&,||. Cabe señalar que el proceso de reduction asume que los operadores utilizados cumplen con la propiedad asociativa (por ejemplo, el operador de resta no cumple con esto).

La variable que está en la reduction clause es compartida. Sin embargo una variable privada es creada por cada thread en el team (con el mismo nombre que aparece en la reduction clause) y si un thread ejecuta un statement en el parallel block que involucra a la variable, entonces se utiliza la variable privada. Al finalizar el parallel block, los valores calculados en las variables privadas son combinados en la variable compartida.

Ejemplo#

En el siguiente código se muestra el uso de nombres de variables al definir variables privadas y compartidas en una reduction clause.

private_shared_variable_reduction_clause_example.c

%%file private_shared_variable_reduction_clause_example.c
#include<stdio.h>
#include<stdlib.h>
#include<omp.h>

int main(int argc, char *argv[]){
    long n_threads;
    int private_variable;
    int sum_shared = 7;
        
    n_threads = strtol(argv[1], NULL, 10);
    
    printf("variable sum_shared al inicio : %d\n", sum_shared);
    
    #pragma omp parallel num_threads(n_threads) reduction(+: sum_shared)
    {
        int my_rank = omp_get_thread_num();
        if(my_rank==0)
            printf("sum_shared, printf 1 del thread 0: %d\n", sum_shared);
        else
            printf("sum_shared, prinftf 1 thread 1: %d\n", sum_shared);
        sum_shared = (my_rank==0)?10:20;
        if(my_rank==0)
            printf("sum_shared, printf 2 del thread 0: %d\n", sum_shared);
        else
            printf("sum_shared, printf 2 del thread 1: %d\n", sum_shared);
        private_variable = (my_rank==0)?1:2;
        sum_shared += private_variable;
        if(my_rank==0)
            printf("sum_shared, printf 3 del thread 0: %d\n", sum_shared);
        else
            printf("sum_shared, printf 3 del thread 1: %d\n", sum_shared);
    
    }
    printf("sum_shared al final de la directive parallel %d\n", sum_shared);
    
    return 0;
}
Writing private_shared_variable_reduction_clause_example.c

Compilamos:

%%bash
gcc -Wall -fopenmp private_shared_variable_reduction_clause_example.c -o private_shared_variable_reduction_clause_example.out 

Ejecutamos:

%%bash
./private_shared_variable_reduction_clause_example.out 2
variable sum_shared al inicio : 7
sum_shared, printf 1 del thread 0: 0
sum_shared, printf 2 del thread 0: 10
sum_shared, printf 3 del thread 0: 11
sum_shared, prinftf 1 thread 1: 0
sum_shared, printf 2 del thread 1: 20
sum_shared, printf 3 del thread 1: 22
sum_shared al final de la directive parallel 40

Comentario

La variable sum_shared se inicializa en 0. En general, las variables privadas creadas para una reduction clause son inicializadas al identity value para el operator. Por ejemplo, si el operator es la multiplicación, entonces las variables privadas son inicializadas en 1.

Ejemplo regla compuesta del rectángulo#

1.Partitioning: la tarea a realizar es el cálculo de un área de un rectángulo para un subintervalo.

2.Communication y mapping: los subintervalos deben repartirse entre los cores y se debe comunicar esta repartición por algún medio (por ejemplo con variables en memoria).

3.Aggregation: un core puede calcular más de un área de un rectángulo si recibe más de un subintervalo.

4.Communication y mapping: el área de los rectángulos calculados por cada procesador deben sumarse para calcular la aproximación a la integral.

Para la medición de tiempos se utilizaron las ligas: measuring-time-in-millisecond-precision y find-execution-time-c-program.

Rcf_parallel.c

%%file Rcf_parallel.c
#include<stdio.h>
#include<stdlib.h>
#include<omp.h>
#include<math.h> 
#include<time.h>
#include <sys/time.h>

double Rcf(double a, double h_hat, int n_s_t);

double f(double node);

int main(){
    double sum_shared = 0.0; //shared variable for threads
    double a = 0.0, b = 1.0;
    int n = 1e7; //number of subintervals
    double h_hat = (b-a)/n;
    int n_subintervals_per_thread; //number of subintervals assigned to each thread
    int n_threads[7] = {0};//each entry of n_threads must divide n variable exactly
    int len_n_threads_array = 0;
    double obj = 0.7468241328124271;
    struct timeval start;
    struct timeval end;
    long seconds;
    long long mili;
    int i;
    n_threads[0] = 1;
    n_threads[1] = 2;
    n_threads[2] = 4;
    n_threads[3] = 8;
    n_threads[4] = 16;
    n_threads[5] = 32;
    n_threads[6] = 64;
    len_n_threads_array = sizeof(n_threads)/sizeof(n_threads[0]);
    for(i=0;i<len_n_threads_array;i++){
        n_subintervals_per_thread = n/n_threads[i];
        gettimeofday(&start, NULL);
        #pragma omp parallel num_threads(n_threads[i]) reduction(+: sum_shared)
            sum_shared += Rcf(a,h_hat,n_subintervals_per_thread);
        sum_shared = h_hat*sum_shared;
        gettimeofday(&end, NULL);
        seconds = (end.tv_sec - start.tv_sec);
        mili = 1000*(seconds) + (end.tv_usec - start.tv_usec)/1000;
        printf("Integral de %f a %f = %1.15e\n", a,b,sum_shared);
        printf("Error relativo de la solución: %1.15e\n", fabs(sum_shared-obj)/fabs(obj));
        printf("Tiempo de ejecución con %d threads: %lld miliseconds\n", n_threads[i],mili);
        printf("----------------------\n");
        sum_shared = 0.0;
    }
    return 0;
}

double Rcf(double a, double h_hat, int n_s_t){
    int begin, end;
    int my_rank = omp_get_thread_num();
    double local_int=0;
    int i;
    double x;
    begin = my_rank*n_s_t;
    end = begin + n_s_t; 
    for(i=begin;i<=end-1;i++){
        x = a+(i+1/2.0)*h_hat;
        local_int += f(x);
    }   
    return local_int;
}
        
double f(double node){
    double f_value;
    f_value = exp(-pow(node,2));
    return f_value;
}
Writing Rcf_parallel.c
%%bash
gcc -Wall -fopenmp Rcf_parallel.c -o Rcf_parallel.out -lm
%%bash
./Rcf_parallel.out
Integral de 0.000000 a 1.000000 = 7.468241328123898e-01
Error relativo de la solución: 4.994950214975770e-14
Tiempo de ejecución con 1 threads: 492 miliseconds
----------------------
Integral de 0.000000 a 1.000000 = 7.468241328124163e-01
Error relativo de la solución: 1.441994556109076e-14
Tiempo de ejecución con 2 threads: 239 miliseconds
----------------------
Integral de 0.000000 a 1.000000 = 7.468241328124452e-01
Error relativo de la solución: 2.423145491193603e-14
Tiempo de ejecución con 4 threads: 121 miliseconds
----------------------
Integral de 0.000000 a 1.000000 = 7.468241328124374e-01
Error relativo de la solución: 1.382530863073651e-14
Tiempo de ejecución con 8 threads: 60 miliseconds
----------------------
Integral de 0.000000 a 1.000000 = 7.468241328124300e-01
Error relativo de la solución: 3.865140047302680e-15
Tiempo de ejecución con 16 threads: 30 miliseconds
----------------------
Integral de 0.000000 a 1.000000 = 7.468241328124294e-01
Error relativo de la solución: 3.121843884359856e-15
Tiempo de ejecución con 32 threads: 21 miliseconds
----------------------
Integral de 0.000000 a 1.000000 = 7.468241328124262e-01
Error relativo de la solución: 1.189273860708517e-15
Tiempo de ejecución con 64 threads: 26 miliseconds
----------------------

Comentarios

  • Dividimos el número de subintervalos n entre el número de threads que deseamos lanzar, por esto, n debe ser divisible entre las entradas del array n_threads. Esta cantidad es el número de subintervalos contiguos que le corresponden a cada thread.

  • Además, se debe de agregar el resultado de la suma de cada thread. Esto es posible realizar de forma sencilla definiendo una variable que sea compartida. Al definir tal variable en la función main y antes de un parallel block el default es que sea considerada como compartida.

  • Las variables que son privadas se definen en la función Rcf.

Directiva parallel for#

  • Esta directiva ayuda a paralelizar ciclos for que tienen una forma canónica como sigue:

  • La directiva realiza un fork de threads en un team para ejecutar el structured block que continúa a la directiva. Tal structured block debe ser un ciclo for.

  • Con esta directiva se dividen las iteraciones del loop entre los threads.

  • Por default todas las variables definidas antes de la directiva parallel son compartidas pero en la directiva parallel for, el índice del ciclo for es privada, cada thread tiene su propia copia de tal índice.

Ejemplo regla compuesta del rectángulo con directiva parallel for#

%%file Rcf_parallel_for.c
#include<stdio.h>
#include<stdlib.h>
#include<math.h>
#include<time.h>
#include <sys/time.h>

double Rcf(long n_t, double ext_izq, double ext_der, int n,\
           double s_res);
double f(double nodo);

int main(int argc, char *argv[]){
    double sum_res = 0.0;
    double a = 0.0, b = 1.0;
    int n = 1e7;
    struct timeval start;
    struct timeval end;
    long seconds;
    long long mili;
    long n_threads;
    double res_rcf;
    double obj = 0.7468241328124271;
    
    n_threads = strtol(argv[1], NULL, 10);   
    gettimeofday(&start, NULL);
    res_rcf = Rcf(n_threads, a,b,n, sum_res);
    gettimeofday(&end, NULL);
    seconds = (end.tv_sec - start.tv_sec);
    mili = 1000*(seconds) + (end.tv_usec - start.tv_usec)/1000;
    printf("Integral de %f a %f = %1.15e\n", a,b,res_rcf);
    printf("Error relativo de la solución: %1.15e\n", fabs(res_rcf-obj)/fabs(obj));
    printf("Tiempo de ejecución: %lld milisegundos", mili);
    return 0;
}
double Rcf(long n_th, double a, double b, int n, double sum_res){
    double h_hat = (b-a)/n;
    double x = 0.0;
    int i = 0;
    sum_res = 0.0;
    # pragma omp parallel for num_threads(n_th) reduction(+: sum_res)
        for(i = 0; i <= n-1; i++){
            x = a+(i+1/2.0)*h_hat;
            sum_res += f(x);
        }
    return h_hat*sum_res;
}
double f(double nodo){
    double valor_f;
    valor_f = exp(-pow(nodo,2));
    return valor_f;
}
Writing Rcf_parallel_for.c
%%bash
gcc -Wall -fopenmp Rcf_parallel_for.c -o Rcf_parallel_for.out -lm
%%bash
./Rcf_parallel_for.out 1
Integral de 0.000000 a 1.000000 = 7.468241328123898e-01
Error relativo de la solución: 4.994950214975770e-14
Tiempo de ejecución: 489 milisegundos
%%bash
./Rcf_parallel_for.out 2
Integral de 0.000000 a 1.000000 = 7.468209392885364e-01
Error relativo de la solución: 4.276139120784953e-06
Tiempo de ejecución: 268 milisegundos
%%bash
./Rcf_parallel_for.out 4
Integral de 0.000000 a 1.000000 = 7.468245048387456e-01
Error relativo de la solución: 4.981444789795125e-07
Tiempo de ejecución: 196 milisegundos
%%bash
./Rcf_parallel_for.out 8
Integral de 0.000000 a 1.000000 = 7.468233188076527e-01
Error relativo de la solución: 1.089955102679663e-06
Tiempo de ejecución: 172 milisegundos
%%bash
./Rcf_parallel_for.out 16
Integral de 0.000000 a 1.000000 = 7.468247754499342e-01
Error relativo de la solución: 8.604937613327165e-07
Tiempo de ejecución: 190 milisegundos
%%bash
./Rcf_parallel_for.out 32
Integral de 0.000000 a 1.000000 = 7.468178324971565e-01
Error relativo de la solución: 8.436143120972810e-06
Tiempo de ejecución: 427 milisegundos

Ejercicio

Implementar la regla de Simpson compuesta con OpenMP utilizando una reduction clause y la directiva parallel for en una máquina de AWS con las mismas características que la que se presenta en esta nota y medir tiempo de ejecución.

Multiprocessing#

El módulo Multiprocessing permite realizar procesamientos basados en procesos o threads para compartir trabajo, datos y lidiar con que la implementación estándar de Python, CPython no utiliza múltiples cores por default para procesamiento.

  • En el apartado de Cpython-Design se menciona el por qué CPython no soporta ejecución multithreaded o multiprocesses y a continuación se colocan unos párrafos de tal discusión.

A particular feature of CPython is that it makes use of a global interpreter lock (GIL) on each CPython interpreter process, which means that within a single process, only one thread may be processing Python bytecode at any one time. This does not mean that there is no point in multithreading; the most common multithreading scenario is where threads are mostly waiting on external processes to complete.

For example, imagine when three threads are servicing separate clients. One thread may be waiting for a client to reply, and another may be waiting for a database query to execute, while the third thread is actually processing Python code.

However, the GIL does mean that CPython is not suitable for processes that implement CPU-intensive algorithms in Python code that could potentially be distributed across multiple cores. …

  • Se recomienda usar este módulo para el shared memory programming y para trabajos que son demandantes de CPU. No se recomienda su uso en trabajos demandantes en I/O. En su lugar se tienen paquetes de Python para ejecución del código de forma asíncrona como asyncio o tornado. También Dask permite tal tipo de ejecución, ver dask-asynchronous.

Comentario

Otro módulo en Python para procesamiento en un SMC con CPUs es concurrent.futures que provee el comportamiento principal de Multiprocessing. Ver concurrent-futures-processpoolexecutor-vs-multiprocessing-pool-pool y concurrent-futures-vs-multiprocessing-in-python-3 para más sobre concurrent.futures y concurrent.futures vs multiprocessing.

Nota sobre el GIL y multiprocessing#

Aunque en Python los threads son nativos del sistema operativo (esto es, no se simulan, son realmente threads del sistema operativo creados en el hardware), están limitados por el global interpreter lock (GIL), de modo que un sólo thread interactúe con un objeto Python en un único tiempo. Esto degrada el performance de los programas pues si un thread con etiqueta 1 tiene el GIL entonces otro thread con etiqueta 2 esperará hasta que el thread 1 lo suelte. Ver Understanding the Python GIL.

Al usar el módulo Multiprocessing ejecutamos en paralelo un número de intérpretes Python (CPython), cada uno con su propio espacio de memoria privada y su propio GIL.

Observación

En multiprocessing se utilizan subprocesos en lugar de threads y en lugar de fork se realiza spawn.

Ejemplo#

En Multiprocessing los procesos son generados al utilizar la clase Process para crear objetos y llamar al método start. Ver Process para documentación de ésta clase.

from multiprocessing import Process
def f():
    print('hello world! de subproceso')
    
p1 = Process(target=f)
p2 = Process(target=f)
p1.start() 
p2.start() 
p1.join() 
p2.join() 
print('hello world! de proceso')
hello world! de subproceso
hello world! de subproceso
hello world! de proceso

Comentarios

  • start sólo puede ser llamada una vez por objeto Process.

  • Con el primer join el proceso principal espera a que termine p1. Con el segundo join el proceso principal espera a que termine p2.

Observación

Es buena práctica explícitamente hacer joins para cada objeto process que realizó start. Ver Programming guidelines para buenas prácticas.

La clase Process recibe la función a ejecutar para cada proceso con el argumento target y también tiene args para los argumentos de la función:

def f(s):
    print(s)
    
p1 = Process(target=f, args=('hola',))
p2 = Process(target=f, args=('mundo',)) 
p1.start()
p2.start()
p1.join()
p2.join()
hola
mundo

Comentarios

  • De acuerdo a Programming guidelines se debe usar un bloque del tipo if __name__=='__main__': para las personas que tienen Windows puedan ejecutar sin problema el código:

if __name__=='__main__':
    p1 = Process(target=f, args=('hola',))
    p2 = Process(target=f, args=('mundo',)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

El bloque del if ayuda a que los subprocesos importen el módulo __main__ (por lo que no se ejecuta la sección que está dentro de if __name__=='__main__': pues no son programas principales) y continúa la ejecución de las líneas de start (cada subproceso ejecuta f) y join. Sin tal bloque if, el notebook quedará bloqueado pues una celda con el código anterior creará subprocesos que a su vez crearán otros subprocesos, que a su vez crearán otros subprocesos… y así de forma recursiva.

En Multiprocessing tenemos la función cpu_count para determinar el número de cores que el sistema operativo puede usar. Este número es la cantidad física o simulada (hyperthreading) de cores.

import multiprocessing
print(multiprocessing.cpu_count())
64

Pool of workers#

Ver Using a pool of workers.

La clase Pool crea un conjunto (pool) de procesos tipo worker que procesarán las tareas a realizar vía funciones tipo map o apply. Se hace map del input hacia los procesadores y se colecta el output de éstos. Mientras el map se realiza, el proceso que lanzó el map se bloquea hasta que finalicen las tareas (aunque hay map_async). El output es una lista.

Observación

El procesamiento de las tareas podríamos hacerlo con la clase Process de arriba pero tendríamos que utilizar un ciclo y colectar los resultados.

Comentario

Para un gran número de tareas a realizar, utilizar Pool. Para pocas tareas (menos de \(10\)) a realizar, utilizar Process.

Ejemplo#

from multiprocessing import Pool
def f(dummy):
    return 'hello world!'
    
pool = Pool(multiprocessing.cpu_count())
results = pool.map(f,range(multiprocessing.cpu_count()))
print(results)
pool.close()    
pool.join()
['hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!', 'hello world!']
def f(dummy):
    return 'hello world!'
    
num_processes=2
pool = Pool(num_processes)
results = pool.map(f,range(num_processes))
print(results)
pool.close()    
pool.join()
['hello world!', 'hello world!']

Ejemplo utilizando apply#

def f():
    return 'hello world!'
    
num_processes=2
pool = Pool(num_processes)
results = [pool.apply(f) for x in range(num_processes)]
print(results)
pool.close()    
pool.join()
['hello world!', 'hello world!']

Ejemplo con context manager#

Nos ayuda a no escribir statements: pool.close, pool.join.

def f(dummy):
    return 'hello world!'
    
num_processes=2
with Pool(processes=num_processes) as pool:
    results = pool.map(f,range(num_processes))
    print(results)
['hello world!', 'hello world!']

Ejemplo para pasar múltiples argumentos vía starmap#

def f(s1, s2):
    return s1 + s2
    
num_processes=2
with Pool(processes=num_processes) as pool:
    iterable = [("hola"," mundo 1arg"),("hola", " mundo 2arg")]
    results = pool.starmap(f, iterable)
    print(results)
['hola mundo 1arg', 'hola mundo 2arg']

Ejemplo regla compuesta del rectángulo#

import math
import time

from scipy.integrate import quad
from pytest import approx
f = lambda x: math.exp(-x**2)
a = 0
b = 1
n = 10**7
h_hat = (b-a)/n
p = multiprocessing.cpu_count()
n_subintervals_per_subprocess = int(n/p) 

Comentario

n_subintervals_per_subprocess es el número de subintervalos por core. Se asume que n es divisible por p. Si no se cumple esto, se puede definir n_subintervals_per_subprocess = int(n/p) habiendo definido n primero y redefinir n como: n = p*n_subintervals_per_subprocess

print("número de subintervalos:", n)
número de subintervalos: 10000000
print("número de subintervalos por proceso:", n_subintervals_per_subprocess)
número de subintervalos por proceso: 156250
def Rcf_multiprocessing(my_id):
    begin = my_id*n_subintervals_per_subprocess
    end = begin+n_subintervals_per_subprocess
    h_hat = (b-a)/n
    sum_res = 0
    for i in range(begin,end):
        x = a+(i+1/2)*h_hat
        sum_res += f(x)
    return sum_res
pool = Pool(processes=p)
start_time=time.time()
res_map = pool.map(Rcf_multiprocessing,range(p))
res_multiprocessing = h_hat*sum(res_map)
end_time = time.time()
secs = end_time-start_time
print("Rcf_multiprocessing tomó",secs,"segundos" )
Rcf_multiprocessing tomó 0.16410470008850098 segundos
obj, err = quad(f, a, b)
print(res_multiprocessing == approx(obj))
True
%%timeit -n 5 -r 10 -o

res_map = pool.map(Rcf_multiprocessing,range(p))
res_multiprocessing = h_hat*sum(res_map)
135 ms ± 6.22 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
<TimeitResult : 135 ms ± 6.22 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)>
Rcf_multiprocessing_timeit = _
print(Rcf_multiprocessing_timeit.average)
0.1347037914800006
pool.close()
pool.join()

Dask#

  • Permite procesamiento en SMC o SMD.

  • Extiende interfaces de arrays, dataframes y listas usadas en NumPy, Pandas e iterators para su procesamiento. En el contexto de Dask a los tipos de valores anteriores les denomina big data collections.

  • Manejo de datasets que no caben en la memoria RAM: larger than memory datasets o out of core datasets.

  • Las big data collections soportadas por Dask son alternativas a los arrays y dataframes de NumPy y Pandas respectivamente para datasets grandes.

  • Provee ejecución con cómputo paralelo o distribuido de algunas funciones de NumPy y Pandas.

  • Soporta un task scheduling dinámico y optimizado para cómputo.

Comentarios

  • Task scheduling es un enfoque de paralelización en el que dividimos el programa en muchos tasks medium-sized. En Dask se representan tales tasks como nodos de un grafo con líneas entre éstos si un task depende de lo producido por otro.

  • Dinámico pues las task graphs pueden ser definidas a partir de las big data collections o por users.

  • Dask posee módulos para optimizar la ejecución de las task graphs (más abajo se describe cómo construir una task graph). Ver optimization.

  • En Dask existen task schedulers para ejecución en paralelo, distribuida o secuencial de la task graph.

¿Cómo se implementan una task y una task/dask graph en Dask?#

Cada task se implementa como una tupla de Python que contiene funciones y argumentos de las mismas.

def fun_sum(arg1,arg2):
    return arg1+arg2

t = (fun_sum, -1, 2)
print(t)
(<function fun_sum at 0x7fd2dfffa430>, -1, 2)

Una task graph o dask graph se implementa como un diccionario de tasks:

dic = {"arg1": -1,
       "arg2": 2,
       "res": (fun_sum, "arg1", "arg2")}

¿Cómo ejecutamos la task/dask graph?#

Con los schedulers definidos en Dask. Ver Scheduling y Scheduler Overview

import dask

Para cualquier scheduler definido en dask el entry point es una función get la cual recibe una task/dask graph y una key o lista de keys para cálculos:

print(dask.get(dic, "res")) #synchronous scheduler 
1
print(dask.threaded.get(dic, "res")) #scheduler backed by a thread pool
1
print(dask.multiprocessing.get(dic, "res")) #scheduler backed by a process pool
1

Comentarios

  • La función dask.get ejecuta con un synchronous scheduler el cual sólo utiliza un thread de ejecución sin paralelización. Útil para debugging y perfilamiento.

  • La función dask.threaded.get ejecuta con un scheduler multiprocessing.pool.ThreadPool. Se hacen forks y joins de un proceso. El overhead para ejecución es pequeño y no hay costo en transferencia de datos entre los tasks. Sin embargo debido al GIL de Python, este scheduler provee paralelización si tu código es esencialmente “no Python”, por ejemplo si utilizas código de NumPy, Pandas o Cython.

  • La función dask.multiprocessing.get ejecuta con un scheduler multiprocessing.Pool. Para cada task se crea un proceso, no tiene problemas del GIL de Python para código Python. Sin embargo, mover datos hacia procesos y de vuelta al proceso principal tiene costos altos. Útil para tasks que no requieren mucha transferencia de datos y los inputs y outputs son pequeños en tamaño.

  • En la documentación de Dask se recomienda utilizar al scheduler distributed en lugar de usar dask.multiprocesing.get. La documentación se encuentra en Dask.distributed. Para una pequeña explicación ver Dask Distributed local.

¿Cómo visualizamos la task/dask graph?#

Usamos visualize. Ver visualize task graphs y dask.visualize.

dask.visualize(dic,"res")
../../_images/Computo_en_paralelo_usando_CPUS_en_SMC_162_0.png

Las cajas representan datos, los círculos representan funciones que se ejecutan sobre tales datos y las flechas especifican cuáles funciones producen/consumen qué datos.

Flujo de trabajo con Dask collections#

En resumen en Dask seguimos un flujo como el siguiente dibujo.

Observación

Para crear task/dask graphs podemos usar las collections o bien, definir nuestras propias.

Comentario

Si trabajamos con las collections será extraño que trabajemos a nivel de funciones get. Cada collection tiene un default scheduler y una función compute que calcula el output de la collection.

Ejemplo#

dask arrays y dask dataframes utilizan al threaded scheduler por default pero con compute puede cambiarse de scheduler.

import dask.array as da
x = da.arange(100)
print(x)
dask.array<arange, shape=(100,), dtype=int64, chunksize=(100,), chunktype=numpy.ndarray>
x
Array Chunk
Bytes 800 B 800 B
Shape (100,) (100,)
Count 1 Tasks 1 Chunks
Type int64 numpy.ndarray
100 1
print(type(x))
<class 'dask.array.core.Array'>

Llamamos a la función dask.compute.

print(x.sum().compute())
4950

Podemos cambiar de scheduler especificando en compute:

print(x.sum().compute(scheduler="processes"))
4950

Por default dask array trabaja con chunks.

x = da.arange(100, chunks=10)
x
Array Chunk
Bytes 800 B 80 B
Shape (100,) (10,)
Count 10 Tasks 10 Chunks
Type int64 numpy.ndarray
100 1

Por default, dask array trabaja de una forma lazy, esto es, dask array no calculará ningún resultado hasta que explícitamente se le dé la instrucción.

y = x.sum()
y
Array Chunk
Bytes 8 B 8.0 B
Shape () ()
Count 24 Tasks 1 Chunks
Type int64 numpy.ndarray

Hasta este punto no se ha calculado la suma (cuyo resultado es un escalar). Después de llamar a compute se ejecuta la task graph para ejecución en paralelo (si se definió al scheduler para este tipo de ejecución).

print(y.compute())
4950

Y podemos visualizar la task/dask graph definida en y que es un dask array:

print(type(y))
<class 'dask.array.core.Array'>
y.visualize()
../../_images/Computo_en_paralelo_usando_CPUS_en_SMC_191_0.png

Ver la API de dask array en array api para más funcionalidad.

Comentario

Dask tiene una función compute más general que toma múltiples collections y regresa múltiples resultados. Esto combina grafos de cada collection de modo que resultados intermedios son compartidos.

y = (x+1).sum()
z = (x+1).mean()
print(da.compute(y, z))
(5050, 50.5)

y aquí el resultado intermedio x+1 sólo fue calculado una vez. Si hubiéramos hecho:

y.compute()
z.compute()

el resultado intermedio x+1 hubiera sido calculado dos veces. Ésta función compute trabaja sobre cualquier collection y se encuentra en dask.base:

from dask.base import compute
print(compute is da.compute)
True
print(compute(y,z))
(5050, 50.5)

También se puede especificar para un scheduler threaded o multiprocessing, el número de workers en compute:

print(y.compute(num_workers=2))
5050

Ver configuration para otras formas de especificar al scheduler. Ver también Delayed y Custom Workloads with Dask Delayed para otra opción diferente a los dask array’s, dask dataframe’s y dask bag’s para cómputo lazy.

Ejemplo regla compuesta del rectángulo con dask.array#

import numpy as np
f_np = lambda x: np.exp(-x**2)
def Rcf_dask_array(f,a,b,n, n_s_w, n_workers):
    """
    Compute numerical approximation using rectangle or mid-point
    method in an interval.
    Nodes are generated via formula: x_i = a+(i+1/2)h_hat for
    i=0,1,...,n-1 and h_hat=(b-a)/n
    Args:
    
        f (float): function expression of integrand.
        
        a (float): left point of interval.
        
        b (float): right point of interval.
        
        n (int): number of subintervals.
        
        n_s_w (int): number of subintervals per worker
        
        n_workers (int): number of workers
        
    Returns:
    
        sum_res (float): numerical approximation to integral
            of f in the interval a,b
    """
    h_hat = (b-a)/n
    aux_dask_array = da.linspace(a, b, n+1, chunks = n_s_w)
    nodes = (aux_dask_array[:-1]+aux_dask_array[1:])/2
    f_evaluated_in_nodes = f_np(nodes)
    res_sum = f_evaluated_in_nodes.sum().compute(num_workers=n_workers)
    return h_hat*res_sum
print(p, n_subintervals_per_subprocess)
64 156250
start_time=time.time()
res_rcf_dask_array = Rcf_dask_array(f_np,a,b,n, 
                                    n_subintervals_per_subprocess, 
                                    p)
end_time = time.time()
secs = end_time-start_time
print("Rcf_dask_array tomó",secs,"segundos" )
Rcf_dask_array tomó 0.1601552963256836 segundos
print(res_rcf_dask_array == approx(obj))
True

Dask distributed#

  • Es un paquete que extiende a concurrent.futures (multiprocessing es similar a concurrent.futures) y a Dask para clústers de tamaño mediano (alrededor de cientos de máquinas).

  • Aunque el nombre distributed puede tomarse como que la ejecución sólo puede llevarse a cabo en clústers de máquinas, también puede utilizarse en nuestra máquina local.

  • Por razones como proveer acceso a una API asíncrona: Futures, acceso a un dashboard diagnóstico y manejar mejor la data locality con múltiples procesos más eficientemente que con dask.multiprocessing.get, representa una alternativa fuerte a la generación de procesos locales vía dask.multiprocessing: The distributed scheduler described a couple sections below is often a better choice today than dask.multiprocessing

Tiene las siguientes características.

  • Low latency: Each task suffers about 1ms of overhead. A small computation and network roundtrip can complete in less than 10ms.

  • Peer-to-peer data sharing: Workers communicate with each other to share data. This removes central bottlenecks for data transfer.

  • Complex Scheduling: Supports complex workflows (not just map/filter/reduce) which are necessary for sophisticated algorithms used in nd-arrays, machine learning, image processing, and statistics.

  • Data Locality: Scheduling algorithms cleverly execute computations where data lives. This minimizes network traffic and improves efficiency.

  • Familiar APIs: Compatible with the concurrent.futures API in the Python standard library. Compatible with dask API for parallel algorithms.

Para ver más sobre el distributed scheduler ver dask-architecture.

¿concurrent.futures?#

  • Es un paquete para ejecución de tasks en paralelo.

  • En concurrent.futures se trabaja con objetos de tipo valor future los cuales son resultados de cálculos que estarán disponibles en un futuro, de aquí el nombre de future y su cómputo no es lazy sino inmediato.

  • La obtención del future es asíncrona por lo que permite cómputo concurrente.

Ejemplo#

import time
def inc(x):
    time.sleep(1)
    return x+1

Ejecución secuencial

%%time
inputs = [1,2,3,4,5,6,7,8,9,10]
results=[]

for x in inputs:
    result=inc(x)
    results.append(result)
CPU times: user 949 µs, sys: 1.02 ms, total: 1.97 ms
Wall time: 10 s
print(results)
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

Ejecución con concurrent.futures

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
executor = ProcessPoolExecutor(multiprocessing.cpu_count())
print(executor)
<concurrent.futures.process.ProcessPoolExecutor object at 0x7fd2dece6ca0>
%%time
future = executor.submit(inc,3)
CPU times: user 39.7 ms, sys: 189 ms, total: 229 ms
Wall time: 218 ms

Observación

Obsérvese que se obtiene el future inmediatamente. Ver submit

print(future)
<Future at 0x7fd2df547af0 state=running>

Observación

La ejecución del future se encuentra en algún subproceso del proceso principal. Obsérvese que tiene un estado running. Y después de un tiempo el estado ya es finished.

print(future)
<Future at 0x7fd2df547af0 state=running>

Para obtener el resultado hacemos:

print(future.result())
4

Entonces podemos hacer submit de varios cálculos de forma asíncrona con el objeto executor que devolverá futures:

%%time
inputs = [1,2,3,4,5,6,7,8,9,10]
futures=[]

for x in inputs:
    future=executor.submit(inc,x)
    futures.append(future)
    
results = [future.result() for future in futures]
CPU times: user 2.82 ms, sys: 3.09 ms, total: 5.92 ms
Wall time: 1.01 s

Comentario

El statement results = [future.result() for future in futures] bloquea la ejecución del proceso local hasta que todos los procesos finalizan la ejecución.

Observación

Obsérvese que se redujo el tiempo pues la ejecución fue en paralelo por los subprocesos que ejecutaron inc.

print(results)
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
executor.shutdown()

Comentarios

Client de dask.distributed#

El Client es el entry point para users de dask.distributed. Creamos un distributed scheduler al importar Client. Esta acción override cualquier configuración que se haya hecho del scheduler.

from dask.distributed import Client
client = Client()

Comentario

Al Client se le pueden pasar argumentos como número de workers, si se desean procesos o no, el límite de memoria a usar por cada proceso, entre otros.

print(client)
<Client: 'tcp://127.0.0.1:45025' processes=8 threads=64, memory=251.89 GiB>
client

Client

Cluster

  • Workers: 8
  • Cores: 64
  • Memory: 251.89 GiB

Para ir al dashboard que se expone con bokeh ir a la url con puerto 8787.

Observación

Si se ejecuta con una imagen de Docker, revisar a qué puerto está mapeado el 8787.

y en el apartado de workers:

Ver Web Interface y Diagnosing Performance para más sobre la información de diagnóstico que se junta en el dashboard.

Comentarios

  • La línea anterior que contiene Client, configura un scheduler en el proceso local y varios procesos workers o dask-workers que corren un thread de ejecución (pueden correr más por default). Si se desea que los procesos worker estén contenidos en el proceso local puede usarse Client(processes=False). Esta situación es preferible si se desea evitar comunicación inter worker y si nuestros cálculos están soltando el GIL (por ejemplo al usar NumPy o dask.array).

  • Algunas opciones que pueden utilizarse con la línea de Client son:

  • Client(n_workers=4).

  • Client(threads_per_worker=2).

  • Client(memory_limit='10GB').

Ver LocalCluster para opciones a usar en Client y API para la API con Client.

Ejemplo regla compuesta del rectángulo con map y gather#

def create_nodes_in_subintervals(my_id,a,b,h_hat,n,n_s_sub):
    begin = my_id*n_s_sub
    end = begin+n_s_sub
    h_hat = (b-a)/n
    nodes = []
    for i in range(begin,end):
        nodes.append(a+(i+1/2.0)*h_hat)
    return nodes
def evaluate_f_in_nodes_and_sum(nodes,f):
    sum_res = 0
    for node in nodes:
        sum_res+=f(node)
    return sum_res
%%time
futures_nodes = client.map(create_nodes_in_subintervals,range(p),
                           **{'a':a,
                              'b':b,
                              "h_hat":h_hat,
                              'n':n,
                              "n_s_sub":n_subintervals_per_subprocess}
                            )
futures_evaluated = client.map(evaluate_f_in_nodes_and_sum, futures_nodes,
                            **{'f':f}
                            )
results_gather = client.gather(futures_evaluated)
res_rcf_distributed = h_hat*sum(results_gather)
CPU times: user 110 ms, sys: 55.8 ms, total: 166 ms
Wall time: 1 s
print(res_rcf_distributed == approx(obj))
True

Comentarios

  • Con map o submit se han enviado las tasks al scheduler a realizar. Ver map, submit y concurrent.futures.

  • Las ejecuciones map son muy rápidas e inmediatas pues se regresan futures.

  • Los cálculos definidos por las funciones create_nodes_in_subintervals y evaluate_f_in_nodes_and_sum se ejecutan en cada dask-worker y a distintos tiempos.

  • Obsérvese que se puede hacer map o submit en futures.

  • Los resultados viven en los procesos dask-workers.

  • Con gather se juntan los resultados (futures) en el proceso local.

  • Podemos usar gather o result de los futures.

client.close()

Ejemplo regla compuesta del rectángulo con map y result#

client = Client(n_workers=multiprocessing.cpu_count(),
                threads_per_worker=1
               )  
client

Client

Cluster

  • Workers: 64
  • Cores: 64
  • Memory: 251.89 GiB
%%time
futures_nodes = client.map(create_nodes_in_subintervals,range(p),
                           **{'a':a,
                              'b':b,
                              "h_hat":h_hat,
                              'n':n,
                              "n_s_sub":n_subintervals_per_subprocess}
                            )
futures_evaluated = client.map(evaluate_f_in_nodes_and_sum, futures_nodes,
                                                       **{'f':f}
                            )

results = [future.result() for future in futures_evaluated]
res_rcf_distributed = h_hat*sum(results)
CPU times: user 428 ms, sys: 75.4 ms, total: 503 ms
Wall time: 630 ms
print(res_rcf_distributed == approx(obj))
True

Gráfica de tiempo de ejecución vs número de procesos#

err_np = []
n_cpus = []
def relative_error(aprox, obj):
    return math.fabs(aprox-obj)/math.fabs(obj)
def Rcf_dask(my_id,f,a,b,h_hat,n,n_s_sub):
    begin=my_id*n_s_sub
    end=begin+n_s_sub
    h_hat=(b-a)/n
    sum_res = 0
    for i in range(begin,end):
        x = a+(i+1/2)*h_hat
        sum_res += f(x)
    return sum_res
def myfun(cl,p,n_s_sub):
    futures_Rcf_dask = cl.map(Rcf_dask,range(p),
                              **{'f':f,
                                 'a':a,
                                 'b':b,
                                 "h_hat":h_hat,
                                 'n':n,
                                 "n_s_sub":n_s_sub}
                              ) 
    results_gather = cl.gather(futures_Rcf_dask)
    res_rcf_dask = h_hat*sum(results_gather)
    return relative_error(res_rcf_dask,obj)
client.close()
client

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B

Primero revisamos errores relativos

for p in range(1,multiprocessing.cpu_count()+1):
    if n%p==0:
        n_subintervals_per_subprocess = int(n/p)
        client = Client(n_workers=p,
                        threads_per_worker=1
                       )
        err_np.append(myfun(client,p,n_subintervals_per_subprocess))
        n_cpus.append(p)
        client.close()
print(err_np)
[4.99495021497577e-14, 1.4419945561090765e-14, 2.4231454911936028e-14, 5.886905610507158e-14, 1.3825308630736507e-14, 1.7690448678039188e-14, 3.716480814714115e-15, 3.86514004730268e-15, 2.527206954005598e-15, 2.973184651771292e-15, 2.229888488828469e-15, 2.6758661865941626e-15, 1.486592325885646e-15]
print(n_cpus)
[1, 2, 4, 5, 8, 10, 16, 20, 25, 32, 40, 50, 64]
l=[]
n_cpus=[]
client.close()
client

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B
for p in range(1,multiprocessing.cpu_count()+1):
    if n%p==0:
        n_subintervals_per_subprocess = int(n/p)
        client = Client(n_workers=p,
                threads_per_worker=1
                       )
        start_time=time.time()
        futures_Rcf_dask = client.map(Rcf_dask,range(p),
                                      **{'f':f,
                                         'a':a,
                                         'b':b,
                                         "h_hat":h_hat,
                                         'n':n,
                                         "n_s_sub":n_subintervals_per_subprocess}
                                     )
        res_gather = client.gather(futures_Rcf_dask)
        end_time = time.time()
        secs = end_time-start_time
        l.append(secs)
        n_cpus.append(p)
        client.close()
print(l)
[3.701357364654541, 1.8432807922363281, 0.9091541767120361, 0.7361886501312256, 0.4691917896270752, 0.4330434799194336, 0.26079630851745605, 0.23067212104797363, 0.2382211685180664, 0.3052539825439453, 0.2685668468475342, 0.2464737892150879, 0.2692685127258301]
print(n_cpus)
[1, 2, 4, 5, 8, 10, 16, 20, 25, 32, 40, 50, 64]
import matplotlib.pyplot as plt
plt.plot(n_cpus,l,'o-')
plt.title('Gráfica num cpus vs tiempo')
plt.xlabel('num cpus')
plt.ylabel('tiempo')
plt.grid()
plt.show()
../../_images/Computo_en_paralelo_usando_CPUS_en_SMC_286_0.png

Ejercicio

Implementar la regla de Simpson compuesta con Multiprocessing y Dask en una máquina de AWS con las mismas características que la que se presenta en esta nota, medir tiempo de ejecución y realizar gráfica de tiempo vs num cpus.

Parallel#

Entre las herramientas más populares en R para cómputo en paralelo se encuentran:

Comentarios

  • Las primeras dos son parte del paquete parallel el cual está incluido en la instalación de R desde la versión 2.14.0.

  • Los cuatro paquetes de arriba emplean un paradigma de programación en paralelo del tipo scatter/gather: se tienen múltiples instancias de R corriendo al mismo tiempo en un clúster de máquinas o en una máquina multicore. Una de las instancias se le denomina manager y las restantes workers. El cómputo en paralelo procede como sigue:

    • scatter: manager descompone el cómputo a realizar en chunks y envía (scatters) los chunks a workers.

    • chunk computation: workers hacen el cómputo en cada chunk y envían de regreso los resultados a manager.

    • gather: manager recibe (gathers) los resultados y los combina para resolver el problema.

Ejemplo#

library(parallel)

En Parallel tenemos la función detectCores que como su nombre lo indica obtiene el número de CPU’s en nuestra máquina.

p <- detectCores()
p
64

Usamos makeCluster para iniciar el snow cluster:

cl <- makeCluster(p)

La línea anterior crea p workers y cada worker es un proceso de R (al igual que el proceso manager) que corren en la misma máquina.

Comentario

El nombre de clúster en snow hace referencia al conjunto de workers y no a máquinas físicas. El objeto cl contiene información de workers y es de clase cluster.

print(class(cl))
[1] "SOCKcluster" "cluster"    

La comunicación entre manager y workers por default en makeCluster es vía network sockets y es posible crear un clúster de máquinas físicas conectadas vía una network.

print(cl)
socket cluster with 64 nodes on host ‘localhost’

clusterApply#

En Parallel se tiene la función clusterApply para ejecución en paralelo de una función aplicada a cada elemento de una lista: manager envía lista[1] a cl[1], lista[2] a cl[2],…,lista[p] a cl[p].

Además, si el número de elementos de la lista es mayor que el número de workers entonces clusterApply realiza una asignación tipo round robin: por ejemplo, si la lista tiene \(4\) elementos y se tienen \(2\) workers entonces clusterApply le envía a worker1 el elemento de la primera posición de la lista, a worker2 el segundo elemento. Después que worker1 finaliza el procesamiento, clusterApply le envía la tercera posición y al terminar worker2 del procesamiento, clusterApply le envía la cuarta posición (clusterApply aprovecha la característica que tiene R de reciclar operaciones).

print(clusterApply(cl, 1:p,function(dummy)print("Hello world!")))
[[1]]
[1] "Hello world!"

[[2]]
[1] "Hello world!"

[[3]]
[1] "Hello world!"

[[4]]
[1] "Hello world!"

[[5]]
[1] "Hello world!"

[[6]]
[1] "Hello world!"

[[7]]
[1] "Hello world!"

[[8]]
[1] "Hello world!"

[[9]]
[1] "Hello world!"

[[10]]
[1] "Hello world!"

[[11]]
[1] "Hello world!"

[[12]]
[1] "Hello world!"

[[13]]
[1] "Hello world!"

[[14]]
[1] "Hello world!"

[[15]]
[1] "Hello world!"

[[16]]
[1] "Hello world!"

[[17]]
[1] "Hello world!"

[[18]]
[1] "Hello world!"

[[19]]
[1] "Hello world!"

[[20]]
[1] "Hello world!"

[[21]]
[1] "Hello world!"

[[22]]
[1] "Hello world!"

[[23]]
[1] "Hello world!"

[[24]]
[1] "Hello world!"

[[25]]
[1] "Hello world!"

[[26]]
[1] "Hello world!"

[[27]]
[1] "Hello world!"

[[28]]
[1] "Hello world!"

[[29]]
[1] "Hello world!"

[[30]]
[1] "Hello world!"

[[31]]
[1] "Hello world!"

[[32]]
[1] "Hello world!"

[[33]]
[1] "Hello world!"

[[34]]
[1] "Hello world!"

[[35]]
[1] "Hello world!"

[[36]]
[1] "Hello world!"

[[37]]
[1] "Hello world!"

[[38]]
[1] "Hello world!"

[[39]]
[1] "Hello world!"

[[40]]
[1] "Hello world!"

[[41]]
[1] "Hello world!"

[[42]]
[1] "Hello world!"

[[43]]
[1] "Hello world!"

[[44]]
[1] "Hello world!"

[[45]]
[1] "Hello world!"

[[46]]
[1] "Hello world!"

[[47]]
[1] "Hello world!"

[[48]]
[1] "Hello world!"

[[49]]
[1] "Hello world!"

[[50]]
[1] "Hello world!"

[[51]]
[1] "Hello world!"

[[52]]
[1] "Hello world!"

[[53]]
[1] "Hello world!"

[[54]]
[1] "Hello world!"

[[55]]
[1] "Hello world!"

[[56]]
[1] "Hello world!"

[[57]]
[1] "Hello world!"

[[58]]
[1] "Hello world!"

[[59]]
[1] "Hello world!"

[[60]]
[1] "Hello world!"

[[61]]
[1] "Hello world!"

[[62]]
[1] "Hello world!"

[[63]]
[1] "Hello world!"

[[64]]
[1] "Hello world!"
print(clusterApply(cl, 1:5,function(dummy)print("Hello world!")))
[[1]]
[1] "Hello world!"

[[2]]
[1] "Hello world!"

[[3]]
[1] "Hello world!"

[[4]]
[1] "Hello world!"

[[5]]
[1] "Hello world!"

Observación

Obsérvese que el resultado de clusterApply es una lista.

Ejemplo de pasar argumentos vía clusterExport#

clusterExport nos permite enviar valores de variables definidas en el workspace global de manager hacia los workspaces de cada worker bajo los mismos nombres de variables:

s <- "Hola mundo!"
clusterExport(cl,'s')
print(clusterApply(cl, 1:p,function(dummy)print(s)))
[[1]]
[1] "Hola mundo!"

[[2]]
[1] "Hola mundo!"

[[3]]
[1] "Hola mundo!"

[[4]]
[1] "Hola mundo!"

[[5]]
[1] "Hola mundo!"

[[6]]
[1] "Hola mundo!"

[[7]]
[1] "Hola mundo!"

[[8]]
[1] "Hola mundo!"

[[9]]
[1] "Hola mundo!"

[[10]]
[1] "Hola mundo!"

[[11]]
[1] "Hola mundo!"

[[12]]
[1] "Hola mundo!"

[[13]]
[1] "Hola mundo!"

[[14]]
[1] "Hola mundo!"

[[15]]
[1] "Hola mundo!"

[[16]]
[1] "Hola mundo!"

[[17]]
[1] "Hola mundo!"

[[18]]
[1] "Hola mundo!"

[[19]]
[1] "Hola mundo!"

[[20]]
[1] "Hola mundo!"

[[21]]
[1] "Hola mundo!"

[[22]]
[1] "Hola mundo!"

[[23]]
[1] "Hola mundo!"

[[24]]
[1] "Hola mundo!"

[[25]]
[1] "Hola mundo!"

[[26]]
[1] "Hola mundo!"

[[27]]
[1] "Hola mundo!"

[[28]]
[1] "Hola mundo!"

[[29]]
[1] "Hola mundo!"

[[30]]
[1] "Hola mundo!"

[[31]]
[1] "Hola mundo!"

[[32]]
[1] "Hola mundo!"

[[33]]
[1] "Hola mundo!"

[[34]]
[1] "Hola mundo!"

[[35]]
[1] "Hola mundo!"

[[36]]
[1] "Hola mundo!"

[[37]]
[1] "Hola mundo!"

[[38]]
[1] "Hola mundo!"

[[39]]
[1] "Hola mundo!"

[[40]]
[1] "Hola mundo!"

[[41]]
[1] "Hola mundo!"

[[42]]
[1] "Hola mundo!"

[[43]]
[1] "Hola mundo!"

[[44]]
[1] "Hola mundo!"

[[45]]
[1] "Hola mundo!"

[[46]]
[1] "Hola mundo!"

[[47]]
[1] "Hola mundo!"

[[48]]
[1] "Hola mundo!"

[[49]]
[1] "Hola mundo!"

[[50]]
[1] "Hola mundo!"

[[51]]
[1] "Hola mundo!"

[[52]]
[1] "Hola mundo!"

[[53]]
[1] "Hola mundo!"

[[54]]
[1] "Hola mundo!"

[[55]]
[1] "Hola mundo!"

[[56]]
[1] "Hola mundo!"

[[57]]
[1] "Hola mundo!"

[[58]]
[1] "Hola mundo!"

[[59]]
[1] "Hola mundo!"

[[60]]
[1] "Hola mundo!"

[[61]]
[1] "Hola mundo!"

[[62]]
[1] "Hola mundo!"

[[63]]
[1] "Hola mundo!"

[[64]]
[1] "Hola mundo!"

¿Qué pasa si mi clusterExport está dentro de una función?

myfun <- function(){
    s_local<-"Hola mundo! usando variable local a myfun"
    clusterExport(cl,"s_local")
    print(clusterApply(cl, 1:p,function(dummy)print(s_local)))
}
print(myfun())
Error in get(name, envir = envir): object 's_local' not found
Traceback:

1. print(myfun())
2. myfun()
3. clusterExport(cl, "s_local")   # at line 3 of file <text>
4. clusterCall(cl, gets, name, get(name, envir = envir))
5. sendCall(cl[[i]], fun, list(...))
6. postNode(con, "EXEC", list(fun = fun, args = args, return = return, 
 .     tag = tag))
7. sendData(con, list(type = type, data = value, tag = tag))
8. sendData.SOCKnode(con, list(type = type, data = value, tag = tag))
9. serialize(data, node$con)
10. get(name, envir = envir)

Lo anterior lo podemos resolver con superassignment. Ver What does the “<<-” operator mean? y how-do-you-use-scoping-assignment-in-r.

myfun2 <- function(){
    s_global <<- "Hola mundo! usando superassignment en mifun2" #<<- superassignment
    clusterExport(cl,"s_global")
    clusterApply(cl, 1:p,function(dummy)print(s_global))
}
print(myfun2())
[[1]]
[1] "Hola mundo! usando superassignment en mifun2"

[[2]]
[1] "Hola mundo! usando superassignment en mifun2"

[[3]]
[1] "Hola mundo! usando superassignment en mifun2"

[[4]]
[1] "Hola mundo! usando superassignment en mifun2"

[[5]]
[1] "Hola mundo! usando superassignment en mifun2"

[[6]]
[1] "Hola mundo! usando superassignment en mifun2"

[[7]]
[1] "Hola mundo! usando superassignment en mifun2"

[[8]]
[1] "Hola mundo! usando superassignment en mifun2"

[[9]]
[1] "Hola mundo! usando superassignment en mifun2"

[[10]]
[1] "Hola mundo! usando superassignment en mifun2"

[[11]]
[1] "Hola mundo! usando superassignment en mifun2"

[[12]]
[1] "Hola mundo! usando superassignment en mifun2"

[[13]]
[1] "Hola mundo! usando superassignment en mifun2"

[[14]]
[1] "Hola mundo! usando superassignment en mifun2"

[[15]]
[1] "Hola mundo! usando superassignment en mifun2"

[[16]]
[1] "Hola mundo! usando superassignment en mifun2"

[[17]]
[1] "Hola mundo! usando superassignment en mifun2"

[[18]]
[1] "Hola mundo! usando superassignment en mifun2"

[[19]]
[1] "Hola mundo! usando superassignment en mifun2"

[[20]]
[1] "Hola mundo! usando superassignment en mifun2"

[[21]]
[1] "Hola mundo! usando superassignment en mifun2"

[[22]]
[1] "Hola mundo! usando superassignment en mifun2"

[[23]]
[1] "Hola mundo! usando superassignment en mifun2"

[[24]]
[1] "Hola mundo! usando superassignment en mifun2"

[[25]]
[1] "Hola mundo! usando superassignment en mifun2"

[[26]]
[1] "Hola mundo! usando superassignment en mifun2"

[[27]]
[1] "Hola mundo! usando superassignment en mifun2"

[[28]]
[1] "Hola mundo! usando superassignment en mifun2"

[[29]]
[1] "Hola mundo! usando superassignment en mifun2"

[[30]]
[1] "Hola mundo! usando superassignment en mifun2"

[[31]]
[1] "Hola mundo! usando superassignment en mifun2"

[[32]]
[1] "Hola mundo! usando superassignment en mifun2"

[[33]]
[1] "Hola mundo! usando superassignment en mifun2"

[[34]]
[1] "Hola mundo! usando superassignment en mifun2"

[[35]]
[1] "Hola mundo! usando superassignment en mifun2"

[[36]]
[1] "Hola mundo! usando superassignment en mifun2"

[[37]]
[1] "Hola mundo! usando superassignment en mifun2"

[[38]]
[1] "Hola mundo! usando superassignment en mifun2"

[[39]]
[1] "Hola mundo! usando superassignment en mifun2"

[[40]]
[1] "Hola mundo! usando superassignment en mifun2"

[[41]]
[1] "Hola mundo! usando superassignment en mifun2"

[[42]]
[1] "Hola mundo! usando superassignment en mifun2"

[[43]]
[1] "Hola mundo! usando superassignment en mifun2"

[[44]]
[1] "Hola mundo! usando superassignment en mifun2"

[[45]]
[1] "Hola mundo! usando superassignment en mifun2"

[[46]]
[1] "Hola mundo! usando superassignment en mifun2"

[[47]]
[1] "Hola mundo! usando superassignment en mifun2"

[[48]]
[1] "Hola mundo! usando superassignment en mifun2"

[[49]]
[1] "Hola mundo! usando superassignment en mifun2"

[[50]]
[1] "Hola mundo! usando superassignment en mifun2"

[[51]]
[1] "Hola mundo! usando superassignment en mifun2"

[[52]]
[1] "Hola mundo! usando superassignment en mifun2"

[[53]]
[1] "Hola mundo! usando superassignment en mifun2"

[[54]]
[1] "Hola mundo! usando superassignment en mifun2"

[[55]]
[1] "Hola mundo! usando superassignment en mifun2"

[[56]]
[1] "Hola mundo! usando superassignment en mifun2"

[[57]]
[1] "Hola mundo! usando superassignment en mifun2"

[[58]]
[1] "Hola mundo! usando superassignment en mifun2"

[[59]]
[1] "Hola mundo! usando superassignment en mifun2"

[[60]]
[1] "Hola mundo! usando superassignment en mifun2"

[[61]]
[1] "Hola mundo! usando superassignment en mifun2"

[[62]]
[1] "Hola mundo! usando superassignment en mifun2"

[[63]]
[1] "Hola mundo! usando superassignment en mifun2"

[[64]]
[1] "Hola mundo! usando superassignment en mifun2"

O bien con environment():

myfun3 <- function(s_global2){
    clusterExport(cl,"s_global2")
    clusterApply(cl, 1:p,function(dummy)print(s_global2))
}
print(myfun3("Hola mundo! usando variable local a mifun3"))
Error in get(name, envir = envir): object 's_global2' not found
Traceback:

1. print(myfun3("Hola mundo! usando variable local a mifun3"))
2. myfun3("Hola mundo! usando variable local a mifun3")
3. clusterExport(cl, "s_global2")   # at line 2 of file <text>
4. clusterCall(cl, gets, name, get(name, envir = envir))
5. sendCall(cl[[i]], fun, list(...))
6. postNode(con, "EXEC", list(fun = fun, args = args, return = return, 
 .     tag = tag))
7. sendData(con, list(type = type, data = value, tag = tag))
8. sendData.SOCKnode(con, list(type = type, data = value, tag = tag))
9. serialize(data, node$con)
10. get(name, envir = envir)
myfun4<-function(s_global2){
    clusterExport(cl,"s_global2", envir=environment())
    clusterApply(cl, 1:p,function(dummy)print(s_global2))
}
print(myfun4("Hola mundo! usando environment()"))
[[1]]
[1] "Hola mundo! usando environment()"

[[2]]
[1] "Hola mundo! usando environment()"

[[3]]
[1] "Hola mundo! usando environment()"

[[4]]
[1] "Hola mundo! usando environment()"

[[5]]
[1] "Hola mundo! usando environment()"

[[6]]
[1] "Hola mundo! usando environment()"

[[7]]
[1] "Hola mundo! usando environment()"

[[8]]
[1] "Hola mundo! usando environment()"

[[9]]
[1] "Hola mundo! usando environment()"

[[10]]
[1] "Hola mundo! usando environment()"

[[11]]
[1] "Hola mundo! usando environment()"

[[12]]
[1] "Hola mundo! usando environment()"

[[13]]
[1] "Hola mundo! usando environment()"

[[14]]
[1] "Hola mundo! usando environment()"

[[15]]
[1] "Hola mundo! usando environment()"

[[16]]
[1] "Hola mundo! usando environment()"

[[17]]
[1] "Hola mundo! usando environment()"

[[18]]
[1] "Hola mundo! usando environment()"

[[19]]
[1] "Hola mundo! usando environment()"

[[20]]
[1] "Hola mundo! usando environment()"

[[21]]
[1] "Hola mundo! usando environment()"

[[22]]
[1] "Hola mundo! usando environment()"

[[23]]
[1] "Hola mundo! usando environment()"

[[24]]
[1] "Hola mundo! usando environment()"

[[25]]
[1] "Hola mundo! usando environment()"

[[26]]
[1] "Hola mundo! usando environment()"

[[27]]
[1] "Hola mundo! usando environment()"

[[28]]
[1] "Hola mundo! usando environment()"

[[29]]
[1] "Hola mundo! usando environment()"

[[30]]
[1] "Hola mundo! usando environment()"

[[31]]
[1] "Hola mundo! usando environment()"

[[32]]
[1] "Hola mundo! usando environment()"

[[33]]
[1] "Hola mundo! usando environment()"

[[34]]
[1] "Hola mundo! usando environment()"

[[35]]
[1] "Hola mundo! usando environment()"

[[36]]
[1] "Hola mundo! usando environment()"

[[37]]
[1] "Hola mundo! usando environment()"

[[38]]
[1] "Hola mundo! usando environment()"

[[39]]
[1] "Hola mundo! usando environment()"

[[40]]
[1] "Hola mundo! usando environment()"

[[41]]
[1] "Hola mundo! usando environment()"

[[42]]
[1] "Hola mundo! usando environment()"

[[43]]
[1] "Hola mundo! usando environment()"

[[44]]
[1] "Hola mundo! usando environment()"

[[45]]
[1] "Hola mundo! usando environment()"

[[46]]
[1] "Hola mundo! usando environment()"

[[47]]
[1] "Hola mundo! usando environment()"

[[48]]
[1] "Hola mundo! usando environment()"

[[49]]
[1] "Hola mundo! usando environment()"

[[50]]
[1] "Hola mundo! usando environment()"

[[51]]
[1] "Hola mundo! usando environment()"

[[52]]
[1] "Hola mundo! usando environment()"

[[53]]
[1] "Hola mundo! usando environment()"

[[54]]
[1] "Hola mundo! usando environment()"

[[55]]
[1] "Hola mundo! usando environment()"

[[56]]
[1] "Hola mundo! usando environment()"

[[57]]
[1] "Hola mundo! usando environment()"

[[58]]
[1] "Hola mundo! usando environment()"

[[59]]
[1] "Hola mundo! usando environment()"

[[60]]
[1] "Hola mundo! usando environment()"

[[61]]
[1] "Hola mundo! usando environment()"

[[62]]
[1] "Hola mundo! usando environment()"

[[63]]
[1] "Hola mundo! usando environment()"

[[64]]
[1] "Hola mundo! usando environment()"

Ejemplo regla compuesta del rectángulo con clusterApply#

library(microbenchmark)
library(tictoc)
f <- function(x)exp(-x**2)
a <- 0
b <- 1
n <- 10**7
h_hat <- (b-a)/n
n_subintervals_per_subprocess <- as.integer(n/p)

Comentario

n_subintervals_per_subprocess es el número de subintervalos por core. Se asume que n es divisible por p. Si no se cumple esto, se puede definir n_subintervals_per_subprocess <- int(n/p) habiendo definido n primero y redefinir n como: n <- p*n_subintervals_per_subprocess

relative_error <- function(aprox,obj)abs(aprox-obj)/abs(obj)
sprintf("número de subintervalos: %d",n)
'número de subintervalos: 10000000'
sprintf("número de subintervalos por proceso: %d",n_subintervals_per_subprocess)
'número de subintervalos por proceso: 156250'
Rcf_parallel_cl_apply <- function(my_id){
    begin <- my_id*n_subintervals_per_subprocess
    end <- begin+n_subintervals_per_subprocess
    sum_res <- 0
    for(j in begin:(end-1)){
        x <- a+(j+1/2)*h_hat
        sum_res <- sum_res+f(x)
    }
    sum_res    
}
clusterExport(cl,c("n_subintervals_per_subprocess",'a','f',"h_hat"))

Utilizamos la función Reduce y la función sum a la lista que resulta de clusterApply:

tic("regla Rcf_parallel_cl_apply")
result_cl_apply  <- clusterApply(cl,0:(p-1),Rcf_parallel_cl_apply)
res_rcf_cl_apply <- h_hat*Reduce(sum,result_cl_apply)
toc()
regla Rcf_parallel_cl_apply: 0.562 sec elapsed
obj <- integrate(Vectorize(f),a,b)
print(relative_error(res_rcf_cl_apply,obj$value))
[1] 1.486592e-15

La encapsulamos en una función para pasarla a microbenchmark:

clapply <- function(cl,p){
    result_cl_apply <- clusterApply(cl,0:(p-1),Rcf_parallel_cl_apply)
    res_rcf_cl_apply <- h_hat*Reduce(sum,result_cl_apply)
}
mbk<-microbenchmark(
    clapply(cl,p),
    times=10
    )
print(mbk)
Unit: milliseconds
           expr      min       lq     mean   median       uq     max neval
 clapply(cl, p) 274.0143 299.3502 335.1274 322.0482 361.4857 457.578    10

Ejemplo regla compuesta del rectángulo con clusterSplit#

La función clusterSplit divide una secuencia en chunks de subsecuencias consecutivas. El número de chunks que regresa es igual al número de workers. clusterSplit intenta que los chunks sean similares en longitud y la división se realiza en manager.

En la siguiente implementación se recibe un chunk de nodos.

Rcf_parallel_cl_split<-function(chunk){
    sum_res <- 0
    for(x in chunk){
        sum_res <- sum_res+f(x)
    }
    sum_res    
}
tic()
#chunks for nodes using clusterSplit:
chunks <- clusterSplit(cl,vapply(0:(n-1),function(j)a+(j+1/2.0)*h_hat,numeric(1)))
#call clusterApply
result_cl_apply <- clusterApply(cl,chunks,Rcf_parallel_cl_split)
res_rcf_cl_split <- h_hat*Reduce(sum,result_cl_apply)
toc()
9.949 sec elapsed
print(relative_error(res_rcf_cl_split,obj$value))
[1] 2.824525e-15

En la siguiente implementación, se divide en chunks la secuencia: 0,1,...,n-1:

Rcf_parallel_cl_split_2 <- function(chunk){
    n_s_sub <- length(chunk)
    begin <- chunk[1]
    end <- begin+n_s_sub
    sum_res <- 0
    for(j in begin:(end-1)){
        x <- a+(j+1/2)*h_hat
        sum_res <- sum_res+f(x)
    }
    sum_res    
}
tic()
#se crean los chunks de la secuencia: 0,1,...,n-1
chunks <- clusterSplit(cl,0:(n-1)) 
result_cl_apply <- clusterApply(cl,chunks,Rcf_parallel_cl_split_2)
res_rcf_cl_split_2 <- h_hat*Reduce(sum,result_cl_apply)
toc()
1.538 sec elapsed
print(relative_error(res_rcf_cl_split_2,obj$value))
[1] 2.824525e-15

Medimos tiempos con Microbenchmark.

clapply2 <- function(cl){
    chunks <- clusterSplit(cl,vapply(0:(n-1),function(j)a+(j+1/2.0)*h_hat,numeric(1))) 
    result_cl_apply <- clusterApply(cl,chunks,Rcf_parallel_cl_split)
    res_rcf_cl_split <- h_hat*Reduce(sum,result_cl_apply)
}
clapply3 <- function(cl){
    chunks <- clusterSplit(cl,0:(n-1))
    result_cl_apply <- clusterApply(cl,chunks,Rcf_parallel_cl_split_2)
    res_rcf_cl_split_2 <- h_hat*Reduce(sum,result_cl_apply)
}
mbk<-microbenchmark(
    clapply(cl,p),
    clapply2(cl),
    clapply3(cl),
    times=10
    )
print(mbk)
Unit: milliseconds
           expr       min        lq       mean    median       uq        max
 clapply(cl, p)  279.1311  282.8535   305.9289  294.0048  325.548   371.0334
   clapply2(cl) 9692.8472 9737.9523 10064.3089 9856.9876 9932.777 12093.3270
   clapply3(cl) 1341.8627 1463.4982  1484.6908 1478.0374 1539.323  1618.6653
 neval
    10
    10
    10

Gráfica de tiempo de ejecución vs número de procesos#

library(ggplot2)

Una buena práctica es detener el clúster:

It is good practice to shut down the workers by calling stopCluster: however the workers will terminate themselves once the socket on which they are listening for commands becomes unavailable, which it should if the master R session is completed (or its process dies).

stopCluster(cl)

En lo siguiente se utiliza Rcf_parallel_cl_apply pues tuvo el menor tiempo de ejecución. Usaremos ésta implementación para realizar la gráfica variando el número de procesos de \(1\) hasta detectCores().

myfun <- function(cl,p){
    result_apply <- clusterApply(cl,0:(p-1),Rcf_parallel_cl_apply)
    res_rcf_cl_apply <- h_hat*Reduce(sum,result_apply)
}
dim <- sum(n%%(1:detectCores())==0)
err_vec <- vector("double", dim)
n_cpus <- vector("integer",dim)
i<-1
for(p in 1:detectCores()){
    if(n%%p==0){
        n_subintervals_per_subprocess <- n/p
        cl <- makeCluster(p)
        clusterExport(cl,c('a','f',"h_hat","n_subintervals_per_subprocess"))
        res_my_fun <- myfun(cl,p)
        err_vec[i] <- relative_error(res_my_fun, obj$value)
        n_cpus[i] <- p
        stopCluster(cl)
        i <- i+1
    }
}
print(err_vec)
 [1] 4.994950e-14 1.441995e-14 2.423145e-14 5.886906e-14 1.382531e-14
 [6] 1.769045e-14 3.716481e-15 3.865140e-15 2.527207e-15 2.973185e-15
[11] 2.229888e-15 2.675866e-15 1.486592e-15
print(n_cpus)
 [1]  1  2  4  5  8 10 16 20 25 32 40 50 64
vector_with_time_medians <- vector("double", dim)
n_cpus <- vector("integer",dim)
i<-1
for(p in 1:detectCores()){
    if(n%%p==0){
        n_subintervals_per_subprocess <- n/p
        cl <- makeCluster(p)
        clusterExport(cl,c('a','f',"h_hat","n_subintervals_per_subprocess"))
        df <- print(microbenchmark(myfun(cl,p),times=10,unit="s"))
        stopCluster(cl)
        vector_with_time_medians[i] <- df$median
        n_cpus[i] <- p
        i <- i+1
    }
}
Unit: seconds
         expr      min       lq     mean   median       uq      max neval
 myfun(cl, p) 6.880437 6.912525 6.962728 6.951347 6.969166 7.092888    10
Unit: seconds
         expr      min       lq     mean   median       uq     max neval
 myfun(cl, p) 3.521054 3.528644 3.550711 3.537149 3.561584 3.63593    10
Unit: seconds
         expr      min      lq     mean   median       uq      max neval
 myfun(cl, p) 1.810182 1.84642 1.868893 1.855109 1.877132 1.992167    10
Unit: seconds
         expr      min       lq     mean   median       uq     max neval
 myfun(cl, p) 1.497631 1.503919 1.539663 1.513827 1.561826 1.68751    10
Unit: seconds
         expr       min        lq   mean    median       uq      max neval
 myfun(cl, p) 0.9574237 0.9681314 0.9938 0.9926238 1.002309 1.084198    10
Unit: seconds
         expr       min        lq      mean    median        uq       max neval
 myfun(cl, p) 0.7567488 0.7752932 0.8005973 0.7915049 0.8226106 0.8705034    10
Unit: seconds
         expr       min        lq      mean    median        uq       max neval
 myfun(cl, p) 0.4981435 0.5000754 0.5255358 0.5042411 0.5605582 0.6147569    10
Unit: seconds
         expr       min        lq      mean   median       uq       max neval
 myfun(cl, p) 0.4741282 0.6334634 0.6247839 0.644421 0.679061 0.7279318    10
Unit: seconds
         expr       min        lq      mean    median        uq       max neval
 myfun(cl, p) 0.4189323 0.5358796 0.5622284 0.5821805 0.6220992 0.6457455    10
Unit: seconds
         expr       min        lq     mean    median        uq       max neval
 myfun(cl, p) 0.5009026 0.5067084 0.526749 0.5106894 0.5534964 0.5992958    10
Unit: seconds
         expr       min        lq      mean    median        uq       max neval
 myfun(cl, p) 0.3703077 0.4051899 0.4655505 0.4660017 0.5055815 0.5712992    10
Unit: seconds
         expr       min       lq      mean    median        uq       max neval
 myfun(cl, p) 0.3289509 0.333303 0.3752226 0.3503937 0.3883032 0.5635342    10
Unit: seconds
         expr       min        lq      mean    median        uq       max neval
 myfun(cl, p) 0.2982448 0.3310383 0.3636902 0.3519018 0.3622468 0.5368318    10
print(vector_with_time_medians)
 [1] 6.9513466 3.5371493 1.8551087 1.5138267 0.9926238 0.7915049 0.5042411
 [8] 0.6444210 0.5821805 0.5106894 0.4660017 0.3503937 0.3519018
print(n_cpus)
 [1]  1  2  4  5  8 10 16 20 25 32 40 50 64
gg<-ggplot()
gg+
geom_line(aes(x=n_cpus,y=vector_with_time_medians))
../../_images/Computo_en_paralelo_usando_CPUS_en_SMC_374_0.png

Ejercicio

Implementar la regla de Simpson compuesta con Parallel en una máquina de AWS con las mismas características que la que se presenta en esta nota, medir tiempo de ejecución y realizar gráfica de tiempo vs num cpus.

Referencias de interés#

Ejercicios

1.Resuelve los ejercicios y preguntas de la nota.

Preguntas de comprehensión:

1)Menciona diferencias que surgen en un programa que se ejecuta en un sistema de memoria compartida contra los que se ejecutan en un sistema de memoria distribuida.

2)¿A qué se le llama no determinismo y da un ejemplo en el que esto surge en un sistema de memoria compartida?

3)¿Qué es una critical section? ¿qué es una race condition? ¿cómo se puede lidiar con las critical sections?

4)¿Cuál es la terminología para nombrar a las variables que pueden ser accesadas por todos los threads y para las variables que sólo pueden ser accesadas por un thread?

Referencias:

  1. P. Pacheco, An Introduction to Parallel Programming, Morgan Kaufmann, 2011.

  2. M. Gorelick, I. Ozsvald, High Performance Python, O’Reilly Media, 2014.

  3. N. Matloff, Parallel Computing for Data Science. With Examples in R, C++ and CUDA, 2014.

  4. B. W. Kernighan, D. M. Ritchie, The C Programming Language, Prentice Hall Software Series, 1988

  5. C/extensiones_a_C/openMP