Descripción general de Async IO en Python 3.7

El módulo asyncio de Python 3 proporciona herramientas fundamentales para implementar E/S asíncronas en Python. Se introdujo en Python 3.4, y con cada subsiguiente mino...

El módulo asyncio de Python 3 proporciona herramientas fundamentales para implementar E/S asíncronas en Python. Se introdujo en Python 3.4 y, con cada versión menor posterior, el módulo ha evolucionado significativamente.

Este tutorial contiene una descripción general del paradigma asíncrono y cómo se implementa en Python 3.7.

E/S con bloqueo frente a sin bloqueo

El problema que la asincronía busca resolver es el bloqueo de E/S.

De manera predeterminada, cuando su programa accede a los datos de una fuente de E/S, espera a que se complete esa operación antes de continuar con la ejecución del programa.

1
2
3
4
with open('myfile.txt', 'r') as file:
    data = file.read()
    # Until the data is read into memory, the program waits here
print(data)

El programa está bloqueado para que no continúe su flujo de ejecución mientras se accede a un dispositivo físico y se transfieren los datos.

Las operaciones de red son otra fuente común de bloqueo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# pip install --user requests
import requests

req = requests.get('https://wikihtp.com/')

#
# Blocking occurs here, waiting for completion of an HTTPS request
#

print(req.text)

En muchos casos, el retraso causado por el bloqueo es insignificante. Sin embargo, el bloqueo de E/S escala muy mal. Si necesita esperar 10^10^ lecturas de archivos o transacciones de red, el rendimiento se verá afectado.

Multiprocesamiento, subprocesos y asincronía {#multiprocesamiento, subprocesos y asincronía}

Las estrategias para minimizar los retrasos del bloqueo de E/S se dividen en tres categorías principales: multiprocesamiento, subprocesos y asincronía.

Multiprocesamiento

El multiprocesamiento es una forma de computación paralela: las instrucciones se ejecutan en un marco de tiempo superpuesto en múltiples procesadores físicos o núcleos. Cada proceso generado por el kernel incurre en un costo general, incluida una porción de memoria asignada de forma independiente (montón).

Python implementa el paralelismo con el módulo multiprocesamiento.

El siguiente es un ejemplo de un programa de Python 3 que genera cuatro procesos secundarios, cada uno de los cuales muestra un retraso aleatorio e independiente. El resultado muestra el ID de proceso de cada hijo, el tiempo del sistema antes y después de cada retraso, y la asignación de memoria actual y máxima en cada paso.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process
import os, time, datetime, random, tracemalloc

tracemalloc.start()
children = 4    # number of child processes to spawn
maxdelay = 6    # maximum delay in seconds

def status():
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        proc = Process(target=child, args=(i,))
        proc.start()

Producción:

1
2
3
4
5
6
7
8
9
Parent PID: 16048
Time: 09:52:47.014906    Malloc, Peak: (228400, 240036)     Process 0, PID: 16051, Delay: 1 seconds...
Time: 09:52:47.016517    Malloc, Peak: (231240, 240036)     Process 1, PID: 16052, Delay: 4 seconds...
Time: 09:52:47.018786    Malloc, Peak: (231616, 240036)     Process 2, PID: 16053, Delay: 3 seconds...
Time: 09:52:47.019398    Malloc, Peak: (232264, 240036)     Process 3, PID: 16054, Delay: 2 seconds...
Time: 09:52:48.017104    Malloc, Peak: (228434, 240036)     Process 0: Done.
Time: 09:52:49.021636    Malloc, Peak: (232298, 240036)     Process 3: Done.
Time: 09:52:50.022087    Malloc, Peak: (231650, 240036)     Process 2: Done.
Time: 09:52:51.020856    Malloc, Peak: (231274, 240036)     Process 1: Done.

Enhebrado

Threading es una alternativa al multiprocesamiento, con ventajas y desventajas.

Los subprocesos se programan de forma independiente y su ejecución puede ocurrir dentro de un período de tiempo superpuesto. Sin embargo, a diferencia del multiprocesamiento, los subprocesos existen completamente en un solo proceso de kernel y comparten un solo montón asignado.

Los subprocesos de Python son concurrentes — múltiples secuencias de código de máquina se ejecutan en marcos de tiempo superpuestos. Pero no son paralelos — la ejecución no ocurre simultáneamente en múltiples núcleos físicos.

Las principales desventajas de los subprocesos de Python son la seguridad de la memoria y las condiciones de carrera. Todos los subprocesos secundarios de un proceso principal operan en el mismo espacio de memoria compartida. Sin protecciones adicionales, un subproceso puede sobrescribir un valor compartido en la memoria sin que otros subprocesos se den cuenta. Tal corrupción de datos sería desastrosa.

Para reforzar la seguridad de subprocesos, las implementaciones de CPython utilizan un bloqueo de intérprete global (GIL). El GIL es un mecanismo mutex que evita que varios subprocesos se ejecuten simultáneamente en objetos de Python. Efectivamente, esto significa que solo se ejecuta un subproceso en un momento dado.

Aquí está la versión con subprocesos del ejemplo de multiprocesamiento de la sección anterior. Tenga en cuenta que muy poco ha cambiado: multiprocessing.Process se reemplaza por threading.Thread. Como se indica en el resultado, todo sucede en un solo proceso y la huella de memoria es significativamente menor.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from threading import Thread
import os, time, datetime, random, tracemalloc

tracemalloc.start()
children = 4    # number of child threads to spawn
maxdelay = 6    # maximum delay in seconds

def status():
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        thr = Thread(target=child, args=(i,))
        thr.start()

Producción:

1
2
3
4
5
6
7
8
9
Parent PID: 19770
Time: 10:44:40.942558    Malloc, Peak: (9150, 9264)     Process 0, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.942937    Malloc, Peak: (13989, 14103)       Process 1, PID: 19770, Delay: 5 seconds...
Time: 10:44:40.943298    Malloc, Peak: (18734, 18848)       Process 2, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.943746    Malloc, Peak: (23959, 24073)       Process 3, PID: 19770, Delay: 2 seconds...
Time: 10:44:42.945896    Malloc, Peak: (26599, 26713)       Process 3: Done.
Time: 10:44:43.945739    Malloc, Peak: (26741, 27223)       Process 0: Done.
Time: 10:44:43.945942    Malloc, Peak: (26851, 27333)       Process 2: Done.
Time: 10:44:45.948107    Malloc, Peak: (24639, 27475)       Process 1: Done.

Asincronía

La asincronía es una alternativa a los subprocesos para escribir aplicaciones concurrentes. Los eventos asincrónicos ocurren en horarios independientes, "desincronizados" entre sí, totalmente dentro de un solo hilo.

A diferencia de los subprocesos, en los programas asincrónicos, el programador controla cuándo y cómo se produce la preferencia voluntaria, lo que facilita aislar y evitar las condiciones de carrera.

Introducción al módulo asyncio de Python 3.7

En Python 3.7, el módulo asyncio proporciona operaciones asincrónicas.

API asincio de alto nivel frente a bajo nivel

Los componentes de Asyncio se dividen en API de alto nivel (para escribir programas) y API de bajo nivel (para escribir bibliotecas o marcos basados ​​en asyncio).

Todos los programas asyncio se pueden escribir utilizando solo las API de alto nivel. Si no está escribiendo un marco o biblioteca, nunca necesita tocar las cosas de bajo nivel.

Dicho esto, veamos las API básicas de alto nivel y analicemos los conceptos básicos.

Corrutinas

En general, una corutina (abreviatura de subrutina cooperativa) es una función diseñada para la multitarea preventiva voluntaria: cede proactivamente a otras rutinas y procesos, en lugar de ser reemplazada por el núcleo. El término "corutina" fue acuñado en 1958 por Melvin Conway (de la fama de "Conway's Law"), para describir el código que facilita activamente las necesidades de otras partes de un sistema.

En asyncio, esta preferencia voluntaria se llama en espera.

Awaitables, Async y Await

Cualquier objeto que pueda esperarse (voluntariamente reemplazado por una corrutina) se denomina esperable.

La palabra clave await suspende la ejecución de la rutina actual y llama al awaitable especificado.

En Python 3.7, los tres objetos disponibles son coroutine, task y future.

Una coroutine asyncio es cualquier función de Python cuya definición tiene como prefijo la palabra clave async.

1
2
async def my_coro():
    pass

Una tarea asyncio es un objeto que envuelve una rutina, proporcionando métodos para controlar su ejecución y consultar su estado. Se puede crear una tarea con asyncio.create_task() o asyncio.gather().

Un futuro asyncio es un objeto de bajo nivel que actúa como un marcador de posición para los datos que aún no se han calculado o obtenido. Puede proporcionar una estructura vacía para llenarla con datos más tarde y un mecanismo de devolución de llamada que se activa cuando los datos están listos.

Una tarea hereda todos menos dos de los métodos disponibles para un futuro, por lo que en Python 3.7 nunca necesita crear un objeto futuro directamente.

Bucles de eventos {#bucles de eventos}

En asyncio, un bucle de eventos controla la programación y la comunicación de objetos en espera. Se requiere un bucle de eventos para usar awaitables. Cada programa asyncio tiene al menos un ciclo de eventos. Es posible tener varios bucles de eventos, pero los bucles de eventos múltiples se desaconsejan en Python 3.7.

Se obtiene una referencia al objeto de bucle actualmente en ejecución llamando a asyncio.get_running_loop().

Dormido

La corrutina asyncio.sleep(delay) se bloquea durante delay segundos. Es útil para simular bloqueos de E/S.

1
2
3
4
5
6
7
8
import asyncio

async def main():
    print("Sleep now.")
    await asyncio.sleep(1.5)
    print("OK, wake up!")

asyncio.run(main())
Inicio del bucle de eventos principal

El punto de entrada canónico a un programa asyncio es asyncio.run(main()), donde main() es una corrutina de nivel superior.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import asyncio

async def my_coro(arg):
    "A coroutine."  
    print(arg)

async def main():
    "The top-level coroutine."
    await my_coro(42)

asyncio.run(main())

Llamar a asyncio.run() implícitamente crea y ejecuta un bucle de eventos. El objeto loop tiene muchos métodos útiles, incluido loop.time(), que devuelve un flotante que representa la hora actual, medida por el reloj interno del bucle.

Nota: La función asyncio.run() no se puede llamar desde un bucle de eventos existente. Por lo tanto, es posible que vea errores si está ejecutando el programa dentro de un entorno de supervisión, como Anaconda o Jupyter, que está ejecutando un ciclo de eventos propio. Los programas de ejemplo de esta sección y las secciones siguientes deben ejecutarse directamente desde la línea de comandos mediante la ejecución del archivo python.

El siguiente programa imprime líneas de texto, bloqueándose durante un segundo después de cada línea hasta la última.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio

async def my_coro(delay):
    loop = asyncio.get_running_loop()
    end_time = loop.time() + delay
    while True:
        print("Blocking...")
        await asyncio.sleep(1)
        if loop.time() > end_time:
            print("Done.")
            break

async def main():
    await my_coro(3.0)

asyncio.run(main())

Producción:

1
2
3
4
Blocking...
Blocking...
Blocking...
Done.
Tareas

Una tarea es un objeto aguardable que envuelve una rutina. Para crear y programar inmediatamente una tarea, puede llamar a lo siguiente:

1
asyncio.create_task(coro(args...))

Esto devolverá un objeto de tarea. La creación de una tarea le dice al ciclo, "continúe y ejecute esta rutina tan pronto como pueda".

Si espera una tarea, la ejecución de la rutina actual se bloquea hasta que se complete esa tarea.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio

async def my_coro(n):
    print(f"The answer is {n}.")

async def main():
    # By creating the task, it's scheduled to run 
    # concurrently, at the event loop's discretion.
    mytask = asyncio.create_task(my_coro(42))
    
    # If we later await the task, execution stops there
    # until the task is complete. If the task is already
    # complete before it is awaited, nothing is awaited. 
    await mytask

asyncio.run(main())

Producción:

1
The answer is 42.

Las tareas tienen varios métodos útiles para administrar la rutina envuelta. En particular, puede solicitar que se cancele una tarea llamando al método .cancel() de la tarea. La tarea se programará para su cancelación en el siguiente ciclo del bucle de eventos. La cancelación no está garantizada: la tarea puede completarse antes de ese ciclo, en cuyo caso no se produce la cancelación.

Recopilación de elementos a la espera {#reunión de elementos a la espera}

Los Awaitables se pueden reunir como un grupo, proporcionándolos como un argumento de lista para la corrutina integrada asyncio.gather(awaitables).

asyncio.gather() devuelve un awaitable que representa los awaitables recopilados y, por lo tanto, debe tener el prefijo await.

Si algún elemento de awaitables es una rutina, se programa inmediatamente como una tarea.

Gathering es una forma conveniente de programar múltiples corrutinas para que se ejecuten simultáneamente como tareas. También asocia las tareas recopiladas de algunas maneras útiles:

  • Cuando se completan todas las tareas reunidas, sus valores de retorno agregados se devuelven como una lista, ordenados de acuerdo con el orden de la lista awaitables.
  • Cualquier tarea reunida puede ser cancelada, sin cancelar las demás tareas.
  • La reunión en sí se puede cancelar, cancelando todas las tareas.
Ejemplo: solicitudes web asíncronas con aiohttp

El siguiente ejemplo ilustra cómo se pueden implementar estas API asyncio de alto nivel. La siguiente es una versión modificada, actualizada para Python 3.7, de El ingenioso ejemplo de asyncio de Scott Robinson. Su programa aprovecha el módulo aiohttp para capturar las publicaciones principales en Reddit y enviarlas a la consola.

Asegúrese de tener instalado el módulo aiohttp antes de ejecutar el siguiente script. Puede descargar el módulo a través del siguiente comando pip:

1
$ pip install --user aiohttp
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import sys  
import asyncio  
import aiohttp  
import json
import datetime

async def get_json(client, url):  
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_top(subreddit, client, numposts):  
    data = await get_json(client, 'https://www.reddit.com/r/' + 
        subreddit + '/top.json?sort=top&t=day&limit=' +
        str(numposts))

    print(f'\n/r/{subreddit}:')

    j = json.loads(data.decode('utf-8'))
    for i in j['data']['children']:
        score = i['data']['score']
        title = i['data']['title']
        link = i['data']['url']
        print('\t' + str(score) + ': ' + title + '\n\t\t(' + link + ')')

async def main():
    print(datetime.datetime.now().strftime("%A, %B %d, %I:%M %p"))
    print('---------------------------')
    loop = asyncio.get_running_loop()  
    async with aiohttp.ClientSession(loop=loop) as client:
        await asyncio.gather(
            get_reddit_top('python', client, 3),
            get_reddit_top('programming', client, 4),
            get_reddit_top('asyncio', client, 2),
            get_reddit_top('dailyprogrammer', client, 1)
            )

asyncio.run(main())

Si ejecuta el programa varias veces, verá que el orden de la salida cambia. Esto se debe a que las solicitudes JSON se muestran a medida que se reciben, lo que depende del tiempo de respuesta del servidor y de la latencia intermedia de la red. En un sistema Linux, puede observar esto en acción ejecutando el script con el prefijo (p. ej.) watch -n 5, que actualizará la salida cada 5 segundos:

{.img-responsivo}

Otras API de alto nivel

Con suerte, esta descripción general le brinda una base sólida de cómo, cuándo y por qué usar asyncio. Otras API de asyncio de alto nivel, que no se tratan aquí, incluyen:

  • stream, un conjunto de primitivas de red de alto nivel para administrar eventos TCP asíncronos.
  • bloqueo, evento, condición, análogos asíncronos de las primitivas de sincronización provistas en el módulo threading.
  • subproceso, un conjunto de herramientas para ejecutar subprocesos asíncronos, como comandos de shell.
  • cola, un análogo asíncrono del módulo cola.
  • excepción, para manejar excepciones en código asíncrono.

Conclusión

Tenga en cuenta que incluso si su programa no requiere asincronía por razones de rendimiento, aún puede usar asyncio si prefiere escribir dentro del paradigma asincrónico. Espero que esta descripción general le brinde una comprensión sólida de cómo, cuándo y por qué comenzar a usar asyncio. asyncio.

Licensed under CC BY-NC-SA 4.0