Procesamiento paralelo en Python

Cuando inicia un programa en su máquina, se ejecuta en su propia "burbuja" que está completamente separado de otros programas que están activos al mismo tiempo...

Introducción

Cuando inicia un programa en su máquina, se ejecuta en su propia "burbuja", que está completamente separada de otros programas que están activos al mismo tiempo. Esta "burbuja" se denomina proceso, y comprende todo lo necesario para gestionar esta llamada de programa.

Por ejemplo, este llamado entorno de proceso incluye las paginas de memoria que el proceso tiene en uso, el archivo que maneja este proceso ha abierto, tanto el usuario y derechos de acceso de grupo, y su llamada de línea de comando completa, incluidos los parámetros dados.

Esta información se guarda en el sistema de archivos de proceso de su sistema UNIX/Linux, que es un sistema de archivos virtual y accesible a través de [/proc](https://github.com/torvalds/linux/blob/master/Documentation/ directorio de sistemas de archivos/proc.txt). Las entradas se ordenan por el ID del proceso, que es único para cada proceso. Ejemplo 1 muestra esto para un proceso seleccionado arbitrariamente que tiene el ID de proceso #177.

Ejemplo 1: Información que está disponible para un proceso

1
2
3
4
5
6
7
8
9
[correo electrónico protegido]:/proc/177# ls
attr         cpuset   limits      net            projid_map   statm
autogroup    cwd      loginuid    ns             root         status
auxv         environ  map_files   numa_maps      sched        syscall
cgroup       exe      maps        oom_adj        sessionid    task
clear_refs   fd       mem         oom_score      setgroups    timers
cmdline      fdinfo   mountinfo   oom_score_adj  smaps        uid_map
comm         gid_map  mounts      pagemap        stack        wchan
coredump_filter       io          mountstats     personality  stat

Estructuración del código y los datos del programa {#estructuración del código y los datos del programa}

Cuanto más complejo se vuelve un programa, más a menudo es útil dividirlo en partes más pequeñas. Esto no se refiere solo al código fuente, sino también al código que se ejecuta en su máquina. Una solución para esto es el uso de subprocesos en combinación con la ejecución en paralelo. Los pensamientos detrás de esto son:

  • Un solo proceso cubre una pieza de código que se puede ejecutar por separado
  • Ciertas secciones de código se pueden ejecutar simultáneamente y, en principio, permiten la paralelización
  • Usar las características de los procesadores y sistemas operativos modernos, por ejemplo, cada núcleo de un procesador que tenemos disponible para reducir el tiempo total de ejecución de un programa
  • Para reducir la complejidad de su programa/código, y subcontratar piezas de trabajo a agentes especializados que actúan como subprocesos

El uso de subprocesos requiere que reconsidere la forma en que se ejecuta su programa, de lineal a paralelo. Es similar a cambiar su perspectiva de trabajo en una empresa de un trabajador ordinario a un gerente: tendrá que estar atento a quién está haciendo qué, cuánto tiempo toma un solo paso y cuáles son las dependencias entre los resultados intermedios.

Esto lo ayuda a dividir su código en fragmentos más pequeños que pueden ser ejecutados por un agente especializado solo para esta tarea. Si aún no lo ha hecho, piense también en cómo se estructura su conjunto de datos para que los agentes individuales puedan procesarlo de manera efectiva. Esto lleva a estas preguntas:

  • ¿Por qué quieres paralelizar el código? En tu caso concreto y en términos de esfuerzo, ¿tiene sentido pensarlo?
  • ¿Su programa está diseñado para ejecutarse solo una vez o se ejecutará regularmente en un conjunto de datos similar?
  • ¿Puedes dividir tu algoritmo en varios pasos de ejecución?
  • ¿Sus datos permiten la paralelización? Si aún no, ¿de qué manera se debe adaptar la organización de sus datos?
  • ¿Qué resultados intermedios de su cálculo dependen unos de otros?
  • ¿Qué cambio de hardware se necesita para eso?
  • ¿Existe un cuello de botella en el hardware o en el algoritmo? ¿Cómo se puede evitar o minimizar la influencia de estos factores?
  • ¿Qué otros efectos secundarios de la paralelización pueden ocurrir?

Un posible caso de uso es un proceso principal y un demonio ejecutándose en segundo plano (maestro/esclavo) esperando ser activado. Además, este puede ser un proceso principal que inicia procesos de trabajo que se ejecutan bajo demanda. En la práctica, el proceso principal es un proceso alimentador que controla dos o más agentes que reciben porciones de los datos y hacen cálculos en la porción dada.

Tenga en cuenta que la paralelización es costosa y lleva mucho tiempo debido a la sobrecarga de los subprocesos que necesita su sistema operativo. En comparación con ejecutar dos o más tareas de forma lineal, al hacerlo en paralelo puede ahorrar entre un 25 y un 30 por ciento del tiempo por subproceso, según su caso de uso. Por ejemplo, dos tareas que consumen 5 segundos cada una necesitan 10 segundos en total si se ejecutan en serie, y pueden necesitar alrededor de 8 segundos en promedio en una máquina multinúcleo cuando están en paralelo. 3 de esos 8 segundos pueden perderse por sobrecarga, lo que limita las mejoras de velocidad.

Ejecutar una función en paralelo con Python {#ejecutar una función en paralelo con Python}

Python ofrece cuatro formas posibles de manejar eso. Primero, puedes ejecutar funciones en paralelo usando el módulo multiprocesamiento. En segundo lugar, una alternativa a los procesos son los hilos. Técnicamente, estos son procesos livianos y están fuera del alcance de este artículo. Para obtener más información, puede consultar Python [módulo de roscado] (https://docs.python.org/3/library/threading.html). En tercer lugar, puede llamar a programas externos utilizando el método system() del módulo os, o los métodos proporcionados por el módulo subprocess, y recopilar los resultados después.

El módulo multiprocesamiento cubre una buena selección de métodos para manejar la ejecución paralela de rutinas. Esto incluye procesos, grupos de agentes, colas y conductos.

Listado 1 funciona con un grupo de cinco agentes que procesan una porción de tres valores al mismo tiempo. Los valores para el número de agentes y para el “tamaño de la porción” se eligen arbitrariamente con fines de demostración. Ajuste estos valores de acuerdo con la cantidad de núcleos en su procesador.

El método Pool.map() requiere tres parámetros: una función que se llamará en cada elemento del conjunto de datos, el conjunto de datos en sí y el tamaño de la porción. En el Listado 1 usamos una función que se llama cuadrado y calcula el cuadrado del valor entero dado. Además, se puede omitir el chunksize. Si no se establece explícitamente, el “tamaño de fragmento” predeterminado es 1.

Tenga en cuenta que el orden de ejecución de los agentes no está garantizado, pero el conjunto de resultados está en el orden correcto. Contiene los valores cuadrados según el orden de los elementos del conjunto de datos original.

Listado 1: Ejecutando funciones en paralelo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Pool

def square(x):
    # calculate the square of the value of x
    return x*x

if __name__ == '__main__':

    # Define the dataset
    dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

    # Output the dataset
    print ('Dataset: ' + str(dataset))

    # Run this with a pool of 5 agents having a chunksize of 3 until finished
    agents = 5
    chunksize = 3
    with Pool(processes=agents) as pool:
        result = pool.map(square, dataset, chunksize)

    # Output the result
    print ('Result:  ' + str(result))

Ejecutar este código debería producir el siguiente resultado:

1
2
3
$ python3 pool_multiprocessing.py 
Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Result:  [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]

Nota: Usaremos Python 3 para estos ejemplos.

Ejecutar múltiples funciones usando una cola {#ejecutar múltiples funciones usando una cola}

Como estructura de datos, una cola es muy común y existe de varias formas. Está organizado como Primero en entrar primero en salir (FIFO), o Last In First Out (LIFO)/[pila](https: //en.wikipedia.org/wiki/Stack_(abstract_data_type)), así como con y sin prioridades (priority queue). La estructura de datos se implementa como una matriz con un número fijo de entradas o como una lista que contiene un número variable de elementos individuales.

En Listados 2.1-2.7 usamos una cola FIFO. Se implementa como una lista que ya proporciona la clase correspondiente del módulo multiprocesamiento. Además, el módulo tiempo se carga y se utiliza para imitar la carga de trabajo.

Listado 2.1: Módulos a utilizar

1
2
import multiprocessing
from time import sleep

A continuación, se define una función de trabajador (Listado 2.2). Esta función representa al agente, en realidad, y requiere tres argumentos. El nombre del proceso indica qué proceso es, y tanto las “tareas” como los “resultados” se refieren a la cola correspondiente.

Dentro de la función de trabajo hay un bucle infinito while. Tanto tareas como resultados son colas que se definen en el programa principal. tasks.get() devuelve la tarea actual de la cola de tareas para ser procesada. Un valor de tarea menor que 0 sale del bucle while y devuelve un valor de -1. Cualquier otro valor de tarea realizará un cálculo (cuadrado) y devolverá este valor. La devolución de un valor al programa principal se implementa como results.put(). Esto agrega el valor calculado al final de la cola de resultados.

Listado 2.2: La función del trabajador

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# define worker function
def calculate(process_name, tasks, results):
    print('[%s] evaluation routine starts' % process_name)

    while True:
        new_value = tasks.get()
        if new_value < 0:
            print('[%s] evaluation routine quits' % process_name)

            # Indicate finished
            results.put(-1)
            break
        else:
            # Compute result and mimic a long-running task
            compute = new_value * new_value
            sleep(0.02*new_value)

            # Output which process received the value
            # and the calculation result
            print('[%s] received value: %i' % (process_name, new_value))
            print('[%s] calculated value: %i' % (process_name, compute))

            # Add result to the queue
            results.put(compute)

    return

El siguiente paso es el bucle principal (ver Listado 2.3). En primer lugar, se define un gestor de comunicación entre procesos (IPC). A continuación, se agregan dos colas, una que guarda las tareas y la otra para los resultados.

Listado 2.3: IPC y colas

1
2
3
4
5
6
7
if __name__ == "__main__":
    # Define IPC manager
    manager = multiprocessing.Manager()

    # Define a list (queue) for tasks and computation results
    tasks = manager.Queue()
    results = manager.Queue()

Una vez realizada esta configuración, definimos un grupo de procesos con cuatro procesos de trabajo (agentes). Hacemos uso de la clase multiprocessing.Pool() y creamos una instancia de ella. A continuación, definimos una lista vacía de procesos (ver Listado 2.4).

Listado 2.4: Definición de un grupo de procesos

1
2
3
4
# Create process pool with four processes
num_processes = 4
pool = multiprocessing.Pool(processes=num_processes)
processes = []

Como siguiente paso iniciamos los cuatro procesos de trabajo (agentes). Para simplificar, se denominan "P0" a "P3". La creación de los cuatro procesos de trabajo se realiza mediante multiprocessing.Process(). Esto conecta cada uno de ellos con la función del trabajador, así como con la tarea y la cola de resultados. Finalmente, agregamos el proceso recién inicializado al final de la lista de procesos y comenzamos el nuevo proceso usando new_process.start() (ver Listado 2.5).

Listado 2.5: Preparar los procesos de trabajo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Initiate the worker processes
for i in range(num_processes):

    # Set process name
    process_name = 'P%i' % i

    # Create the process, and connect it to the worker function
    new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results))

    # Add new process to the list of processes
    processes.append(new_process)

    # Start the process
    new_process.start()

Nuestros procesos de trabajo están esperando trabajo. Definimos una lista de tareas, que en nuestro caso son números enteros seleccionados arbitrariamente. Estos valores se agregan a la lista de tareas mediante tasks.put(). Cada proceso de trabajo espera tareas y elige la siguiente tarea disponible de la lista de tareas. Esto lo maneja la propia cola (ver Listado 2.6).

Listado 2.6: Preparar la cola de tareas

1
2
3
4
5
6
7
# Fill task queue
task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3]
for single_task in task_list:
    tasks.put(single_task)

# Wait while the workers process
sleep(5)

Después de un tiempo nos gustaría que nuestros agentes terminaran. Cada proceso de trabajo reacciona en una tarea con el valor -1. Interpreta este valor como una señal de terminación y muere a partir de entonces. Es por eso que ponemos tantos -1 en la cola de tareas como procesos en ejecución. Antes de morir, un proceso que termina pone un -1 en la cola de resultados. Esto pretende ser una señal de confirmación para el bucle principal de que el agente está terminando.

En el bucle principal, leemos de esa cola y contamos el número de -1. El ciclo principal se cierra tan pronto como hayamos contado tantas confirmaciones de finalización como procesos tengamos. De lo contrario, sacamos el resultado del cálculo de la cola.

Listado 2.7: Terminación y salida de resultados

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# Quit the worker processes by sending them -1
for i in range(num_processes):
    tasks.put(-1)

# Read calculation results
num_finished_processes = 0
while True:
    # Read result
    new_result = results.get()

    # Have a look at the results
    if new_result == -1:
        # Process has finished
        num_finished_processes += 1

        if num_finished_processes == num_processes:
            break
    else:
        # Output result
        print('Result:' + str(new_result))

Ejemplo 2 muestra la salida del programa Python. Al ejecutar el programa más de una vez, puede notar que el orden en que se inician los procesos de trabajo es tan impredecible como el propio proceso que selecciona una tarea de la cola. Sin embargo, una vez finalizado el orden de los elementos de la cola de resultados coincide con el orden de los elementos de la cola de tareas.

Ejemplo 2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
$ python3 queue_multiprocessing.py 
[P0] evaluation routine starts
[P1] evaluation routine starts
[P2] evaluation routine starts
[P3] evaluation routine starts
[P1] received value: 1
[P1] calculated value: 1
[P0] received value: 43
[P0] calculated value: 1849
[P0] received value: 68
[P0] calculated value: 4624
[P1] received value: 142
[P1] calculated value: 20164
result: 1
result: 1849
result: 4624
result: 20164
[P3] received value: 256
[P3] calculated value: 65536
result: 65536
[P0] received value: 183
[P0] calculated value: 33489
result: 33489
[P0] received value: 3
[P0] calculated value: 9
result: 9
[P0] evaluation routine quits
[P1] received value: 334
[P1] calculated value: 111556
result: 111556
[P1] evaluation routine quits
[P3] received value: 325
[P3] calculated value: 105625
result: 105625
[P3] evaluation routine quits
[P2] received value: 780
[P2] calculated value: 608400
result: 608400
[P2] evaluation routine quits

Nota: Como se mencionó anteriormente, es posible que su resultado no coincida exactamente con el que se muestra arriba, ya que el orden de ejecución es impredecible.

Uso del método os.system()

El método system() es parte del módulo del sistema operativo, que permite ejecutar programas de línea de comandos externos en un proceso separado de su programa Python. El método system() es una llamada de bloqueo, y debe esperar hasta que la llamada finalice y regrese. Como fetichista de UNIX/Linux, sabe que un comando se puede ejecutar en segundo plano y escribir el resultado calculado en el flujo de salida que se redirige a un archivo como este (consulte Ejemplo 3):

Ejemplo 3: Comando con redirección de salida

1
$ ./program >> outputfile &

En un programa de Python, simplemente encapsula esta llamada como se muestra a continuación:

Listado 3: Llamada al sistema simple usando el módulo os

1
2
3
import os

os.system("./program >> outputfile &")

Esta llamada al sistema crea un proceso que se ejecuta en paralelo a su programa Python actual. Obtener el resultado puede volverse un poco complicado porque esta llamada puede terminar después del final de su programa Python, nunca se sabe.

Usar este método es mucho más costoso que los métodos anteriores que describí. En primer lugar, la sobrecarga es mucho mayor (cambio de proceso) y, en segundo lugar, escribe datos en la memoria física, como un disco, lo que lleva más tiempo. Sin embargo, esta es una mejor opción si tiene memoria limitada (como con RAM) y, en cambio, puede tener datos de salida masivos escritos en un disco de estado sólido.

Usando el módulo de subproceso

Este módulo está destinado a reemplazar las llamadas os.system() y os.spawn(). La idea de subproceso es simplificar los procesos de generación, comunicándose con ellos a través de conductos y señales, y recopilando la salida que producen, incluidos los mensajes de error.

A partir de Python 3.5, el subproceso contiene el método subprocess.run() para iniciar un comando externo, que es un contenedor para la clase subyacente subprocess.Popen(). Como ejemplo, lanzamos el comando UNIX/Linux df -h para averiguar cuánto espacio de disco queda disponible en la partición /home de su máquina. En un programa de Python, realiza esta llamada como se muestra a continuación (Listado 4).

Listado 4: Ejemplo básico para ejecutar un comando externo

1
2
3
4
import subprocess

ret = subprocess.run(["df", "-h", "/home"])
print(ret)

Esta es la llamada básica, y muy similar al comando df -h /home que se ejecuta en una terminal. Tenga en cuenta que los parámetros están separados como una lista en lugar de una sola cadena. El resultado será similar al Ejemplo 4. En comparación con la documentación oficial de Python para este módulo, genera el resultado de la llamada a stdout, además del valor de retorno de la llamada.

Ejemplo 4 muestra la salida de nuestra llamada. La última línea de la salida muestra la ejecución exitosa del comando. Llamar a subprocess.run() devuelve una instancia de la clase CompletedProcess que tiene los dos atributos llamados args (argumentos de línea de comando) y returncode (valor de retorno del comando).

Ejemplo 4: Ejecutar el script de Python del Listado 4

1
2
3
4
$ python3 diskfree.py
Filesystem   Size   Used  Avail Capacity  iused   ifree %iused  Mounted on
/dev/sda3  233Gi  203Gi   30Gi    88% 53160407 7818407   87%   /home
CompletedProcess(args=['df', '-h', '/home'], returncode=0)

Para suprimir la salida a stdout y captar tanto la salida como el valor devuelto para una evaluación posterior, la llamada de subprocess.run() debe modificarse ligeramente. Sin más modificaciones, subprocess.run() envía la salida del comando ejecutado a stdout, que es el canal de salida del proceso subyacente de Python. Para capturar la salida, tenemos que cambiar esto y establecer el canal de salida en el valor predefinido subprocess.PIPE. Listado 5 muestra cómo hacerlo.

Listado 5: Tomando la salida en una tubería

1
2
3
4
5
6
7
8
import subprocess

# Call the command
output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE)

# Read the return code and the output data
print ("Return code: %i" % output.returncode)
print ("Output data: %s" % output.stdout)

Como se explicó antes, subprocess.run() devuelve una instancia de la clase CompletedProcess. En el Listado 5, esta instancia es una variable llamada simplemente salida. El código de retorno del comando se mantiene en el atributo output.returncode, y la salida impresa en stdout se puede encontrar en el atributo output.stdout. Tenga en cuenta que esto no cubre el manejo de mensajes de error porque no cambiamos el canal de salida para eso.

Conclusión

El procesamiento paralelo es una gran oportunidad para usar el poder del hardware contemporáneo. Python le da acceso a estos métodos a un nivel muy sofisticado. Como ha visto antes, tanto el módulo multiprocesamiento como el subproceso le permiten sumergirse en ese tema fácilmente.

Agradecimientos

El autor quisiera agradecer a Geroldo Rupprecht por su apoyo y críticas durante la preparación de este artículo.

Licensed under CC BY-NC-SA 4.0