Introducción a las secuencias de Node.js

Los flujos representan un flujo de datos continuo y secuencial recibido por una fuente potencialmente ilimitada. En este artículo, usaremos flujos de escritura, legibles y dúplex en Node.js, así como también construiremos un Stream Pipeline.

Introducción

Los flujos son un concepto algo avanzado de entender. Entonces, en este artículo, iremos junto con algunos ejemplos para una mejor comprensión y le presentaremos algunos conceptos en el camino.

¿Qué es una transmisión

En términos simples, los flujos se utilizan para leer desde la entrada o escribir en la salida de forma secuencial. La mayoría de las veces, los flujos se utilizan para leer o escribir desde una fuente continua o comparablemente grande.

Por ejemplo, supongamos que tiene que leer un archivo grande. Si el tamaño del archivo es más grande que su espacio libre en la memoria, no puede leer todo el archivo en la memoria para procesarlo. Tienes que leerlo pieza por pieza y procesar cada fragmento, que puede estar separado por una línea, por ejemplo.

Otro ejemplo de una fuente continua es la comunicación de red, como una aplicación de chat en la que los datos deben fluir continuamente desde el remitente hasta el receptor.

Flujos en Node.js

El módulo Stream es un módulo nativo que se envía de forma predeterminada en Node.js. Stream es una instancia de la clase EventEmitter, que maneja los eventos de forma asíncrona en Node.js. Debido a su superclase, las secuencias se basan inherentemente en eventos.

Hay 4 tipos de transmisiones en Node.js:

  • Escribible: Se utiliza para escribir datos secuencialmente
  • Legible: Se usa para leer datos secuencialmente
  • Dúplex: Se usa para leer y escribir datos secuencialmente
  • Transformar: Donde se pueden modificar los datos al escribir o leer. Tome la compresión como ejemplo, con un flujo como este puede escribir datos comprimidos y leer datos descomprimidos.

Echemos un vistazo a algunos ejemplos de secuencias.

Flujos grabables

En primer lugar, vamos a crear un flujo de escritura y escribir algunos datos en un archivo:

1
2
3
4
5
const fs = require('fs');
const file = fs.createWriteStream('file.txt');

file.write('hello world');
file.end(', from streams!');

En este código, hemos utilizado el módulo del sistema de archivos para crear un flujo de escritura en un archivo (file.txt) y escribirle 2 fragmentos separados: hello world y , from streams.

A diferencia de fs.writeFile(), donde necesitamos escribir el contenido del archivo de una vez, usando un flujo podemos escribir el contenido fragmento por fragmento.

Para simular una entrada continua, podríamos hacer algo como:

1
2
3
4
5
6
7
const fs = require('fs');
const file = fs.createWriteStream('file.txt');

for (let i = 0; i < 10000; i++) {
    file.write('Hello world ' + i);
}
file.end();

Esto escribirá Hello world + {i} diez mil veces y luego finalizará la transmisión:

1
2
3
4
5
6
Hello world 0
Hello world 1
Hello world 2
Hello world 3
Hello world 4
...

Por favor, recuerde .end() sus transmisiones después de que haya terminado de usarlas, ya que el evento finish se envía después de que se haya llamado al método .end().

Esto significa que el cuerpo de la transmisión se ha vaciado en nuestro archivo.

Flujos legibles

Ahora echemos un vistazo a otro ejemplo simple leyendo un archivo usando una secuencia. Podemos leer un archivo fragmento por fragmento, en lugar de leer el contenido completo en la memoria, usando un flujo legible:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const fs = require('fs');

const readableStream = fs.createReadStream('./article.md', {
    highWaterMark: 10
});

readableStream.on('readable', () => {
    process.stdout.write(`[${readableStream.read()}]`);
});

readableStream.on('end', () => {
    console.log('DONE');
});

De manera similar a la creación de un flujo de escritura, hemos creado un flujo legible llamando al método .createReadStream().

Durante el almacenamiento en búfer (segmentación de los datos en fragmentos), el tamaño del búfer depende del parámetro highWaterMark, que se pasa al constructor de flujo.

El valor predeterminado de este parámetro es 16384 bytes (16 kb), por lo que si no anula el parámetro, la transmisión leerá fragmentos de 16 kb y se los pasará para que los procese.

Dado que estamos usando un archivo de texto pequeño, tiene más sentido usar un valor pequeño para nuestro ejemplo, por lo que el texto se dividirá en 10 caracteres.

En nuestro ejemplo anterior, simplemente imprimimos la porción de datos que recibimos, excepto con corchetes alrededor para que pueda ver fácilmente las diferentes porciones. La salida de nuestro código se ve así:

1
2
3
4
5
6
7
[### Introd][uction

St][reams are ][a somewhat][ advanced ][concept to][ understan][d. So in t][his articl][e, we will][ go along ][with some ][examples f][or a bette][r understa][nding and ][introduce ][you to a f][ew concept][s along th][e way.

##][# What is ][a Stream

][In simple ]...

Flujos dúplex

Con los flujos grabables y legibles fuera del camino, podemos saltar a un ejemplo usando flujos dúplex, que esencialmente combinan ambos.

Los demostraremos usando un servidor HTTP simple creado con el módulo http nativo de Node.js. El ejemplo utilizado aquí es de la Documentación de Node.js oficial.

Dado que los servidores reciben solicitudes y luego envían respuestas, son un buen ejemplo de secuencias dúplex, que manejan ambas: una secuencia legible actuará como una solicitud continua y una secuencia escribible actuará como respuesta.

Primero, importemos el módulo HTTP:

1
const http = require('http');

Ahora vamos a crear un servidor HTTP simple:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const server = http.createServer((req, res) => {
    // `req` is an http.IncomingMessage, which is a Readable Stream.
    // `res` is an http.ServerResponse, which is a Writable Stream.

    let body = '';

    // Get the data as utf8 strings.
    // If an encoding is not set, Buffer objects will be received.
    req.setEncoding('utf8');

    // Readable streams emit 'data' events once a listener is added.
    req.on('data', (chunk) => {
        body += chunk;
    });

    // The 'end' event indicates that the entire body has been received.
    req.on('end', () => {
        consol.log(body);

        try {
            // Send 'Hello World' to the user
            res.write('Hello World');
            res.end();
        } catch (er) {
            res.statusCode = 400;
            return res.end(`error: ${er.message}`);
        }
    });
});

El parámetro req es un flujo legible, que procesaremos al recibirlo como una solicitud HTTP. Luego enviaremos res como respuesta, que es, de nuevo, un flujo de escritura simple.

Luego, utilizando el método ‘.on()’, leemos el cuerpo de la solicitud en fragmentos de 64 KB y lo almacenamos en el ‘cuerpo’, activado por el evento ‘datos’.

Tenga en cuenta el uso del método setEncoding() antes de leer de la secuencia.

De esta manera, la transmisión emitirá cadenas y, de lo contrario, emitirá objetos Buffer. Sin embargo, también puede realizar esa conversación dentro de la devolución de llamada del evento data si lo prefiere.

El evento end se activa cuando no queda nada por leer en un flujo legible. Hablaremos de otros eventos útiles más adelante en este artículo.

Ahora, escuchemos al servidor:

1
server.listen(1337);

Al presionar http://localhost:1337, debería ver una respuesta simple Hello World del servidor HTTP.

Tuberías de flujo

Usando canalizaciones de flujo, podemos canalizar directamente flujos legibles a un flujo de escritura sin almacenar el búfer temporalmente, por lo que podemos ahorrar espacio en la memoria.

Considere un escenario en el que un usuario solicita un archivo grande del servidor y no hay espacio de memoria para cargarlo en la memoria, o miles de clientes diferentes solicitan el mismo archivo. En este caso, no podemos leer el contenido del archivo en la memoria y luego volver a escribirlo en el cliente.

Aquí es donde el método pipe es útil, ya que canalizaremos un flujo legible (una solicitud) a un flujo escribible (una respuesta) y se lo entregaremos al usuario sin retenerlo en el búfer.

Primero, hagamos esto sin usar flujos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
    fs.readFile('./video.mkv', (err, data) => {
        if (err) throw err;

        res.end(data);
    });
});

server.listen(1337);

Este método lee directamente el archivo en la memoria usando el método .readFile() y lo envía al usuario.

Abra su navegador web y vaya a http://localhost:1337, esto es lo que sucede detrás de escena:

web browser memory usage

Ahora, sirvamos el video usando una transmisión:

1
2
3
4
5
6
7
8
9
const http = require('http');
const fs = require('fs');

const server = http.createServer((req, res) => {
    const src = fs.createReadStream('./video.mkv');
    src.pipe(res);
});

server.listen(1337);

En este código, hemos creado un flujo legible para el archivo y lo canalizamos directamente a la respuesta HTTP, por lo que en lugar de cargarlo en la memoria, la entrada del disco HDD se escribe directamente en la red sin consumir la memoria.

Aquí está la captura de pantalla del uso de la memoria al enviar el archivo mediante una secuencia:

uso de memoria del navegador web con un flujo

Como puede ver, el uso de la memoria es demasiado bajo en comparación con el primer método.

Eventos útiles en un flujo

Dado que la clase Stream hereda la clase EventEmitter, cada flujo tendrá su propio tipo de eventos a los que puede suscribirse utilizando el método on() de EventEmitter. Este evento dependerá del tipo de transmisión.

Eventos en transmisiones legibles

  • data: se emite cuando se lee una porción de datos de la secuencia. Por defecto, el fragmento será un objeto Buffer. Si desea cambiarlo, puede usar el método .setEncoding().
  • error: Emitido cuando ocurre un error durante la lectura. Esto puede suceder si el flujo de escritura no puede generar datos debido a una falla interna o cuando se inserta un fragmento no válido en el flujo.
  • end: Emitido cuando no hay más datos en el flujo.
  • close: Emitido cuando el recurso de flujo está cerrado e indica que no se emitirán más eventos en el futuro.
  • readable: Emitido cuando los datos están disponibles en el flujo de lectura para leer.

Eventos en flujos grabables

  • close: Emitido cuando el recurso de flujo está cerrado e indica que no se emitirán más eventos en el futuro.
  • error: Emitido cuando ocurre un error durante la lectura. Esto puede suceder si el flujo de escritura no puede generar datos debido a una falla interna o cuando se envían datos de fragmentos no válidos al flujo.
  • finish: se emite cuando todos los datos se han vaciado del flujo de escritura.
  • pipe: se emite cuando el flujo de escritura se canaliza a un flujo de lectura.
  • unpipe: se emite cuando el flujo de escritura no se canaliza de un flujo de lectura.

Conclusión

En términos simples, los flujos se utilizan para leer desde la entrada o escribir en la salida de forma secuencial. La mayoría de las veces, los flujos se utilizan para leer o escribir desde una fuente continua o comparablemente grande.

El módulo Stream es un módulo nativo que se envió de forma predeterminada en Node.js. Stream es una instancia de la clase EventEmitter, que maneja eventos de forma asíncrona en Node.js. Debido a su superclase, las secuencias se basan inherentemente en eventos.

Transformation Streams no se trataron en este artículo, ya que justifican su propio artículo.

El código fuente de este proyecto está disponible en GitHub como de costumbre. Use esto para comparar su código si se quedó atascado en el tutorial.

Si quieres más información sobre streams y/o conocimientos avanzados, se recomienda seguir la documentación oficial para Streams.