Tareas asíncronas con Flask, Redis y Celery

En este artículo, usaremos Redis y Celery con Flask para crear un sistema de mensajería asíncrono para crear una aplicación de recordatorio simple.

Introducción

A medida que las aplicaciones web evolucionan y aumenta su uso, los casos de uso también se diversifican. Ahora estamos creando y utilizando sitios web para tareas más complejas que nunca. Algunas de estas tareas se pueden procesar y enviar comentarios a los usuarios al instante, mientras que otras requieren un mayor procesamiento y la transmisión de los resultados más adelante. La mayor adopción de acceso a Internet y dispositivos con capacidad para Internet ha llevado a un mayor tráfico de usuarios finales.

En un intento por manejar el aumento del tráfico o la mayor complejidad de la funcionalidad, a veces podemos optar por diferir el trabajo y transmitir los resultados en un momento posterior. De esta manera, no conseguimos que el usuario espere un tiempo desconocido en nuestra aplicación web y, en cambio, enviamos los resultados en un momento posterior. Podemos lograr esto utilizando tareas en segundo plano para procesar el trabajo cuando hay poco tráfico o procesar el trabajo en lotes.

Una de las soluciones que podemos usar para lograr esto es Apio. Nos ayuda a desglosar piezas de trabajo complejas y hacer que las realicen diferentes máquinas para aliviar la carga en una máquina o reducir el tiempo necesario para completarlas.

En esta publicación, exploraremos el uso de Celery para programar tareas en segundo plano en una aplicación Flask para descargar tareas que requieren muchos recursos y priorizar la respuesta a los usuarios finales.

¿Qué es una cola de tareas?

Una cola de tareas es un mecanismo para distribuir pequeñas unidades de trabajo o tareas que se pueden ejecutar sin interferir con el ciclo de solicitud-respuesta de la mayoría de las aplicaciones basadas en web.

Las colas de tareas son útiles para delegar el trabajo que, de lo contrario, ralentizaría las aplicaciones mientras esperan respuestas. También se pueden usar para manejar tareas que requieren muchos recursos mientras la máquina principal o el proceso interactúa con el usuario.

De esta forma, la interacción con el usuario es consistente, oportuna y no se ve afectada por la carga de trabajo.

¿Qué es el apio?

Celery es una cola de tareas asíncrona basada en el paso de mensajes distribuidos para distribuir la carga de trabajo entre máquinas o subprocesos. Un sistema de apio consta de un cliente, un corredor y varios trabajadores.

Estos trabajadores son responsables de la ejecución de las tareas o trabajos que se colocan en la cola y la transmisión de los resultados. Con Celery, puede tener trabajadores locales y remotos, lo que significa que el trabajo se puede delegar a máquinas diferentes y más capaces a través de Internet y los resultados se transmiten al cliente.

De esta manera, se alivia la carga en la máquina principal y hay más recursos disponibles para manejar las solicitudes de los usuarios a medida que ingresan.

El cliente en una configuración de Celery es responsable de enviar trabajos a los trabajadores y también de comunicarse con ellos mediante un intermediario de mensajes. El bróker facilita la comunicación entre el cliente y los trabajadores en una instalación de Celery a través de una cola de mensajes, donde se agrega un mensaje a la cola y el bróker lo entrega al cliente.

Ejemplos de dichos intermediarios de mensajes incluyen redis y ConejoMQ.

¿Por qué usar apio? {#por qué usar apio}

Hay varias razones por las que deberíamos Apio para nuestras tareas de fondo. Primero, es bastante escalable, lo que permite agregar más trabajadores a pedido para atender una mayor carga o tráfico. Celery también se encuentra todavía en desarrollo activo, lo que significa que es un proyecto compatible junto con su documentación concisa y su comunidad activa de usuarios.

Otra ventaja es que Celery es fácil de integrar en múltiples marcos web, y la mayoría tiene bibliotecas para facilitar la integración.

También proporciona la funcionalidad para interactuar con otras aplicaciones web a través de webhooks donde no hay una biblioteca para admitir la interacción.

Celery también puede usar una variedad de intermediarios de mensajes que nos ofrecen flexibilidad. Se recomienda RabbitMQ pero también puede soportar Redis y habichuelas mágicas.

Aplicación de demostración

Construiremos una aplicación Flask que permita a los usuarios configurar recordatorios que se enviarán a sus correos electrónicos en un momento determinado.

También proporcionaremos la funcionalidad para personalizar la cantidad de tiempo antes de que se invoque el mensaje o recordatorio y se envíe el mensaje al usuario.

Configuración

Como cualquier otro proyecto, nuestro trabajo se desarrollará en un entorno virtual que crearemos y gestionaremos mediante la herramienta Pipenv:

1
2
$ pipenv install --three
$ pipenv shell

Para este proyecto, necesitaremos instalar los paquetes Flask y Celery para comenzar:

1
$ pipenv install flask celery

Así es como se verá la estructura de archivos de nuestra aplicación Flask:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
.
├── Pipfile                    # manage our environment
├── Pipfile.lock
├── README.md
├── __init__.py
├── app.py                     # main Flask application implementation
├── config.py                  # to host the configuration
├── requirements.txt           # store our requirements
└── templates
    └── index.html             # the landing page

1 directory, 8 files

Para nuestro proyecto basado en Celery, usaremos Redis como intermediario de mensajes y podemos encontrar las instrucciones para configurarlo en su página de inicio.

Implementación

Comencemos por crear la aplicación Flask que generará un formulario que permitirá a los usuarios ingresar los detalles del mensaje que se enviará en el futuro.

Agregaremos lo siguiente a nuestro archivo app.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from flask import Flask, flash, render_template, request, redirect, url_for

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        email = request.form['email']
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        message = request.form['message']
        duration = request.form['duration']
        duration_unit = request.form['duration_unit']

        flash(Message scheduled)
        return redirect(url_for('index'))


if __name__ == '__main__':
    app.run(debug=True)

Esta es una aplicación realmente simple con solo una ruta para manejar una solicitud GET y POST para el formulario. Una vez que se envían los detalles, podemos entregar los datos a una función que programará el trabajo.

Para ordenar nuestro archivo de aplicación principal, colocaremos las variables de configuración en un archivo config.py separado y cargaremos la configuración desde el archivo:

1
app.config.from_object("config")

Nuestro archivo config.py estará en la misma carpeta que el archivo app.py y contiene algunas configuraciones básicas:

1
2
SECRET_KEY = 'very_very_secure_and_secret'
# more config

Por ahora, implementemos la página de destino como index.html:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{% for message in get_flashed_messages() %}
  <p style="color: red;">{{ message }}</p>
{% endfor %}

<form method="POST">
    First Name: <input id="first_name" name="first_name" type="text">
    Last Name: <input id="last_name" name="last_name" type="text">
    Email: <input id="email" name="email" type="email">
    Message: <textarea id="textarea" name="message"></textarea>
    Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text">

   <select name="duration_unit">
      <option value="" disabled selected>Choose the duration</option>
      <option value="1">Minutes</option>
      <option value="2">Hours</option>
      <option value="3">Days</option>
   </select>

   <button type="submit" name="action">Submit </button>
</form>

El estilo y el formato se han truncado por razones de brevedad, siéntase libre de formatear/diseñar su HTML como desee.

Ahora podemos iniciar nuestra aplicación:

landing page

Envío de correos electrónicos mediante Flask-Mail

Para enviar correos electrónicos desde nuestra aplicación Flask, utilizaremos la biblioteca Frasco de correo, que agregaremos a nuestro proyecto de la siguiente manera:

1
$ pipenv install flask-mail

Con nuestra aplicación Flask y el formulario en su lugar, ahora podemos integrar Flask-Mail en nuestro app.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from flask_mail import Mail, Message

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

# set up Flask-Mail Integration
mail = Mail(app)

def send_mail(data):
    """ Function to send emails.
    """
    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']
        mail.send(msg)

La función send_main(data) recibirá el mensaje a enviar y el destinatario del correo electrónico y luego se invocará después de que haya pasado el tiempo especificado para enviar el correo electrónico al usuario.

También necesitaremos agregar las siguientes variables a nuestro config.py para que Flask-Mail funcione:

1
2
3
4
5
6
# Flask-Mail
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = 'mail-username'
MAIL_PASSWORD = 'mail-password'

Integración de apio

Con nuestra aplicación Flask lista y equipada con la funcionalidad de envío de correo electrónico, ahora podemos integrar Celery para programar los correos electrónicos que se enviarán en una fecha posterior.

Nuestro app.py será modificado nuevamente:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Existing imports are maintained
from celery import Celery

# Flask app and flask-mail configuration truncated

# Set up celery client
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)

# Add this decorator to our send_mail function
@client.task
def send_mail(data):
    # Function remains the same

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']

        if duration_unit == 'minutes':
            duration *= 60
        elif duration_unit == 'hours':
            duration *= 3600
        elif duration_unit == 'days':
            duration *= 86400

        send_mail.apply_async(args=[data], countdown=duration)
        flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")

        return redirect(url_for('index'))

Importamos celery y lo usamos para inicializar el cliente Celery en nuestra aplicación Flask adjuntando la URL para el agente de mensajería. En nuestro caso, usaremos Redis como intermediario, por lo que agregamos lo siguiente a nuestro config.py:

1
2
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

Para que nuestra función send_mail() se ejecute como una tarea en segundo plano, agregaremos el decorador @client.task para que nuestro cliente Celery lo sepa.

Después de configurar el cliente Celery, se modifica la función principal que también maneja la entrada de formulario.

Primero, empaquetamos los datos de entrada para la función send_mail() en un diccionario. Luego, invocamos nuestra función de correo a través de Celery Task Calling API usando la función apply_async, que toma los argumentos requeridos por nuestra función.

Se establece un parámetro de “cuenta regresiva” opcional, que define un retraso entre la ejecución del código y la realización de la tarea.

Esta duración está en segundos, por lo que convertimos la duración pasada por el usuario en segundos dependiendo de la unidad de tiempo que elija.

Después de que el usuario haya enviado el formulario, acusaremos recibo y le notificaremos a través de un mensaje publicitario cuando se envíe el mensaje.

Reunir todo {#reunir todo}

Para poder ejecutar nuestro proyecto, necesitaremos dos terminales, uno para iniciar nuestra aplicación Flask y el otro para iniciar el trabajador Celery que enviará mensajes en segundo plano.

Inicie la aplicación Flask en la primera terminal:

1
$ python app.py

En la segunda terminal, inicie el entorno virtual y luego inicie el trabajador Celery:

1
2
3
# start the virtualenv
$ pipenv shell
$ celery worker -A app.client --loglevel=info

Si todo va bien, obtendremos los siguientes comentarios en la terminal que ejecuta el cliente Celery:

celery kick off

Ahora naveguemos a http://localhost:5000 y complete los detalles programando el correo electrónico para que llegue después de 2 minutos de envío.

Encima del formulario, aparecerá un mensaje que indica la dirección que recibirá el correo electrónico y la duración después de la cual se enviará el correo electrónico. En nuestra terminal Celery, también podremos ver una entrada de registro que significa que nuestro correo electrónico ha sido programado:

1
[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

La sección ETA de la entrada muestra cuándo se llamará a nuestra función send_email() y, por lo tanto, cuándo se enviará el correo electrónico.

Hasta aquí todo bien. Nuestros correos electrónicos se están programando y enviando en el tiempo especificado, sin embargo, falta una cosa. No tenemos visibilidad de las tareas antes o después de que se ejecuten y no tenemos forma de saber si el correo electrónico se envió realmente o no.

Por esta razón, implementemos una solución de monitoreo para nuestras tareas en segundo plano para que podamos ver las tareas y también estar al tanto en caso de que algo salga mal y las tareas no se ejecuten según lo planeado.

Monitoreo de nuestro grupo de apio usando Flower

Flor es una herramienta basada en la web que brindará visibilidad de nuestra configuración de Celery y brinda la funcionalidad para ver el progreso de la tarea, el historial, los detalles y las estadísticas, incluidos índices de éxito o fracaso. También podemos monitorear a todos los trabajadores en nuestro clúster y las tareas que están manejando actualmente.

Instalar Flower es tan fácil como:

1
$ pipenv install flower

Anteriormente, especificamos los detalles de nuestro cliente Celery en nuestro archivo app.py. Tendremos que pasar ese cliente a Flower para monitorearlo.

Para lograr esto, debemos abrir una tercera ventana de terminal, saltar a nuestro entorno virtual e iniciar nuestra herramienta de monitoreo:

1
2
$ pipenv shell
$ flower -A app.client --port=5555

Al iniciar Flower, especificamos el cliente Celery pasándolo a través del argumento de la aplicación (-A), y también especificando el puerto que se usará a través del argumento --port.

Con nuestro monitoreo implementado, programemos el envío de otro correo electrónico en el tablero y luego naveguemos a http://localhost:5555, donde nos recibe lo siguiente:

flower landing page

En esta página, podemos ver la lista de trabajadores en nuestro clúster Celery, que actualmente solo está compuesto por nuestra máquina.

Para ver el correo electrónico que acabamos de programar, haga clic en el botón Tareas en la parte superior izquierda del tablero y esto nos llevará a la página donde podemos ver las tareas que se han programado:

scheduled tasks

En esta sección, podemos ver que habíamos programado dos correos electrónicos y uno se envió con éxito a la hora programada. Los correos electrónicos estaban programados para enviarse después de 1 minuto y 5 minutos, respectivamente, con fines de prueba.

También podemos ver la hora en que se recibió el texto y cuándo se ejecutó desde esta sección.

En la sección del monitor, hay gráficos que muestran las tasas de éxito y fracaso de las tareas en segundo plano.

Podemos programar mensajes durante el tiempo que queramos, pero eso también significa que nuestro trabajador debe estar en línea y funcional en el momento en que se supone que debe ejecutarse la tarea.

Conclusión

Hemos configurado con éxito un clúster de Celery y lo hemos integrado en nuestra aplicación Flask que permite a los usuarios programar el envío de correos electrónicos después de un tiempo determinado en el futuro.

La funcionalidad de envío de correo electrónico se delegó a una tarea en segundo plano y se colocó en una cola donde un trabajador la seleccionará y la ejecutará en nuestro clúster local de Celery.

El código fuente de este proyecto es, como siempre, disponible en Github. ).