Mensajería pub/sub asíncrona en Java con NATS JetStream

En esta guía, veremos cómo usar la biblioteca NATS JetStream para enviar mensajes de forma asincrónica en Java a través del patrón de publicación/suscripción, para crear microservicios y sistemas de mensajería distribuidos.

Introducción

Cuando construimos clústeres de aplicaciones distribuidos a gran escala, utilizamos todos nuestros esfuerzos para dividir los monolitos en pequeñas cargas de trabajo en contenedores que se comunican entre sí y comparten información para realizar diversas acciones.

No dedicamos mucho tiempo a diseñar un sistema de mensajería.

La mensajería suele tratarse como el sistema nervioso central de cualquier sistema distribuido a gran escala. Por lo general, las comunicaciones en memoria dentro de los monolitos se convierten en comunicaciones por cable.

Si conectamos todas las comunicaciones dentro de un clúster, forma módulos similares a una malla donde cada servicio llama a otro servicio de forma síncrona, lo que no es ideal debido a los largos tiempos de espera en el ciclo de vida de solicitud-respuesta.

Esta malla desordenada se puede solucionar introduciendo un clúster de mensajería asíncrona entre los servicios, en lugar de uno síncrono.

En lugar de tener una comunicación punto a punto entre dos microservicios, podemos delegar sus mensajes en una especie de topología central y radial. Por lo tanto, la mensajería es un pegamento que une todo el sistema.

Necesidad de Sistema de Mensajería

En esta guía, utilizaremos NATS JetStream para realizar la entrega asincrónica de mensajes mediante el patrón de publicación/suscripción.

Entonces, ¿cómo elegimos un intermediario de mensajes o una arquitectura de mensajería para nuestra aplicación?

Elegir un sistema de mensajería puede ser bastante abrumador, con una gran cantidad de opciones ya disponibles y nuevas que aparecen todos los días, cada una con diferentes ventajas.

Elegir un sistema de mensajería distribuida

En particular, tenemos apache kafka, ampliamente popular y de uso bastante frecuente, que a menudo se conoce como almacén de registro distribuido.

Los mensajes publicados en temas en Kafka persisten durante algún tiempo y el concepto de grupos de consumidores permite que los mensajes se distribuyan uniformemente entre varias instancias del mismo servicio. Es extremadamente poderoso, pero el poder conlleva una gran responsabilidad y mantenimiento. Kafka es notablemente difícil de mantener y tiene una curva de aprendizaje pronunciada para cualquier equipo que busque mejorar en la tecnología.

Otra opción única es ConejoMQ. RabbitMQ utiliza el Protocolo de cola de mensajes avanzado para la mensajería. Es significativamente ligero también.

En lugar de utilizar el concepto de grupos de consumidores únicos, RabbitMQ adopta el enfoque más simple de hacer que los clientes consuman colas. Si un cliente no reconoce un mensaje, volverá a la cola para ser procesado por otro.

Todos estos productos tienen un punto óptimo y brillan en sus casos de uso.

Entonces, ¿qué pasa si alguien realmente quiere abrazar la idea de tener un sistema simple pero de muy alto rendimiento sin la sobrecarga adicional de mantenerlo? ¿Qué pasa si a alguien le gustaría hacer pub/sub tradicional, pero también solicitar/responder y tal vez incluso scatter-gather, todo mientras mantiene las cosas simples y ligeras?

Aquí es donde el sistema de mensajería NATS podría encajar mejor en su solución.

Presentación de NATS

NATS es un sistema de mensajería nativo en la nube y probado en producción creado para desarrolladores u operadores que desean pasar más tiempo implementando la lógica empresarial y menos tiempo preocupándose por cómo enviar mensajes.

Es un sistema de mensajería de código abierto increíblemente rápido construido sobre un núcleo simple pero poderoso. El servidor utiliza un protocolo basado en texto, por lo que, si bien hay una serie de bibliotecas de clientes específicas del idioma, puede, literalmente, telnet en un servidor NATS para enviar y recibir mensajes.

NATS está diseñado para estar siempre activo, conectado y listo para aceptar comandos. Si tiene la edad suficiente para saber qué es un tono de marcación, vale la pena mencionar que al equipo de NATS le gusta usar esa analogía para su diseño.

Algunas de las características más destacadas de NATS incluyen:

  • Rendimiento Ultra-alto
  • Configuración baja
    • Clients only need a URL and credentials
    • Servers auto-discover themselves
  • Capacidad para expandir la arquitectura sin afectar los servicios en ejecución
  • Se autocura y está siempre disponible
  • Admite múltiples modos de entrega:
    • At most once(Nats Core)
    • At least once(NATS Streaming or JetStream)
  • Almacene mensajes en tiendas persistentes y reprodúzcalos por tiempo o secuencia
  • Apoyo Comodín
  • Cifrado de datos en REST
  • Limpiar mensajes específicos (GDPR)
  • Escalabilidad Horizontal
  • Soporte TLS completo: certificados CA, soporte bidireccional
  • Compatibilidad con autenticación de usuario/contraseña estándar/uso de JWT
  • Restricciones de permisos
  • Tenencia múltiple segura con aislamiento de datos
  • Compartir datos entre cuentas
  • Tenga más de 30 bibliotecas de clientes escritas en diferentes idiomas

Patrones de mensajería

NATS admite 4 patrones principales de comunicación. Están:

  • Basado en temas
  • Publicar-Suscribir
  • Solicitud-Respuesta/Dispersión-Recopilación
  • Grupos de cola

Cada uno de estos es un paradigma diferente y tiene su caso de uso, con cierta superposición. Permitir estos cuatro patrones le da a NATS una gran flexibilidad y funcionalidad para varias circunstancias diferentes entre múltiples aplicaciones o un monolito grande.

Mensajería basada en asunto

Un Asunto en NATS es simplemente una cadena que representa un interés en los datos. Está tokenizado jerárquicamente para admitir suscripciones comodín:

  • foo.* coincide con foo.bar y foo.baz
  • foo.*.bar coincide con foo.a.bar y foo.b.bar
  • foo.> coincide con cualquiera de los anteriores
  • > coincide con todo en NATS

Este patrón de mensajería permite que el editor comparta datos usando un Asunto, y los consumidores pueden recibir estos mensajes al escuchar estos asuntos usando comodines.

In a sense, this paradigm is based on the Patrón de diseño del observador, which typically has a Subject and Observers.

Por ejemplo, si alguien envía el mensaje a 'audit.us.east', todos los suscriptores que escuchen el asunto exacto o un asunto comodín recibirán este mensaje.

Mensajería por asunto

Mensajes de publicación-suscripción

Este es uno de los patrones de mensajería tradicionales donde los Editores publican un mensaje en una lista de Suscriptores donde cada suscriptor está suscrito individualmente.

Publish Subscribe Messaging

Esto es análogo a un boletín informativo, y este patrón se usa extensamente en varios sistemas. Desde sistemas de notificación/alerta hasta plataformas VoD como YouTube.

Este es el patrón que usaremos en esta guía.

Mensaje de solicitud-respuesta/patrón de dispersión-recopilación

Cuando hacemos llamadas a la API REST, en las que emitimos una solicitud HTTP y recibimos una respuesta, estamos utilizando un patrón de solicitud-respuesta síncrono tradicional. El patrón Solicitud-Respuesta a menudo es difícil o, a veces, requiere soluciones o compromisos complejos. Este patrón es bastante simple cuando se implementa con NATS, ya que solo necesita que proporcione un asunto "responder a" al publicar un mensaje.

Este patrón también se puede llamar patrón Scatter-Gather, donde un editor publica un mensaje sobre un tema para un número desconocido de suscriptores al mismo tiempo. Luego, todos los oyentes que escucharan este tema se activarían y comenzarían a procesar. Luego, el editor esperaría para acumular todas las respuestas de algunos o todos los suscriptores.

Scatter Gather Pattern

Grupos de cola

A veces, en un clúster distribuido, tiene que equilibrar la carga de varias aplicaciones o varias instancias de la misma aplicación. Este patrón sería una solución perfecta para equilibrar la carga de los mensajes entre varios suscriptores que se han suscrito al mismo tema.

La mejor parte de esta solución es que, a diferencia de otros sistemas de mensajería, no requiere ninguna configuración en el servidor NATS. Los grupos de cola están definidos por la aplicación y sus suscriptores de cola y se administran entre ellos.

Para crear una suscripción de cola, todos los suscriptores registran un nombre de cola. A medida que se publican mensajes sobre el tema registrado, se elige aleatoriamente un miembro del grupo para recibir el mensaje. Aunque los grupos de colas tienen varios suscriptores, cada mensaje es consumido por uno solo.

Queue Groups

Todos estos patrones necesitan configuración cero en el servidor NATS.

Está totalmente controlado por la aplicación o las bibliotecas del cliente. Así que echemos un vistazo a la biblioteca del cliente Java jnats para ver cómo podemos definir algunos de estos patrones y realizar mensajes asíncronos.

Servidor NATS básico, transmisión NATS y NATS JetStream

El primer ecosistema de mensajería nativo de la nube de NATS se introdujo con el servidor de NATS basado en el modelo de entrega 'Como máximo una vez': los mensajes se entregan una vez como máximo . Solía ​​reenviar los mensajes publicados a los consumidores a velocidades increíbles, estableciendo el nuevo umbral de rendimiento para la industria. Para algunas aplicaciones, el rendimiento básico ofrecido por NATS superó las pérdidas potenciales de mensajes perdidos.

Pero con el modelo de entrega 'Como máximo una vez', si alguno de los suscriptores está inactivo, los mensajes enviados nunca llegarán y, por lo tanto, no hay garantía de entrega de los datos.

Esto era análogo al protocolo UDP ultrarrápido utilizado para la mayoría de los servicios de transmisión, donde la velocidad de los datos era más importante que la integridad de los datos. Prefieres perder algunos píxeles en un video o tener una resolución más baja que tener que esperar un período prolongado para escuchar la voz de alguien.

Pero esto no es algo que quieras que suceda en una transacción financiera. Perder un poco aquí y allá podría cambiar la factura de alguien o la dirección del destinatario.

Como respuesta a esto, se introdujo NATS Streaming, que cambió parte del rendimiento por la persistencia del mensaje. No se sacrificó mucho el rendimiento y NATS Streaming era una plataforma liviana y de alto rendimiento que usaba NATS básico debajo del capó. Fue construido con el modelo de entrega 'Al menos una vez' con la capacidad de enviar mensajes ‘ACK’ para editores y suscriptores.

Esto es análogo a TCP, que garantiza la integridad de los datos y reenvía los paquetes si no se recibe un mensaje ACK, lo que indica que es posible que el cliente no haya recibido el paquete.

Cuando se publican los mensajes, se conservan durante un tiempo (personalizable) para que los consumidores puedan reproducirlos si no los han recibido. Aunque este componente era extremadamente liviano y de alto rendimiento, no es tan poderoso como los sistemas de transmisión distribuida como Kafka en términos de capacidad y madurez.

Los desarrolladores plantearon requisitos como seguridad distribuida, administración descentralizada, tenencia múltiple, escalamiento global con superclusters y uso compartido seguro de datos que dieron lugar a la próxima generación de NATS Streaming en la era de NATS 2.0, conocida como **NATS JetStream **.

Para los sistemas de transmisión modernos con clústeres distribuidos, se recomienda utilizar la última oferta de Corriente de chorro de NATS. JetStream se creó para resolver los problemas identificados con la tecnología de transmisión en la actualidad: complejidad, fragilidad y falta de escalabilidad. Vamos a jugar con JetStream más adelante en este artículo.

Mensajería Pub/Sub asíncrona en Java con NATS JetStream

Configuración del proyecto

Ejecutar o instalar un servidor NATS JetStream es bastante fácil. Ya sea que desee alojar este clúster en una máquina con Windows, Mac o Linux, Motor acoplable hace que la configuración sea realmente sencilla.

Usaremos un contenedor Docker para alojar un servidor JetStream. Para ejecutar la imagen de Docker, simplemente podemos ejecutar:

1
$ docker run -ti -p 4222:4222 --name jetstream synadia/jsm:latest server

Una vez que ejecute eso, será recibido con algo como:

JetStream Logs

NATS tiene una amplia lista de bibliotecas de clientes en varios idiomas con una comunidad activa de más de 1000 colaboradores. Se unió a CNCF (Cloud Native Computing Foundation) como un proyecto en incubación en

Usaremos el cliente NATS Java conocido como jnats. Para conectarnos a NATS JetStream, solo necesitamos definir una dependencia en pom.xml:

1
2
3
4
5
<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>${version}</version>
</dependency>

¡Eso es todo! Estamos listos para irnos. Ahora veamos algunos de nuestros casos de uso. Como siempre, si se queda atascado, puede encontrar el código fuente completo en GitHub.

Streaming de editor/suscriptor

Intentemos definir un modelo tradicional de Editor/Suscriptor creando un nuevo Stream y un asunto. Streams en NATS JetStream representa cualquier flujo de datos entre dos puntos finales y es el bloque de construcción central de la API.

Crearemos una sola clase para publicar primero algunos mensajes y luego suscribirse para leer esos mensajes y enviar un reconocimiento:

1
2
3
public class PubSubAsync {
// Proceeding code goes here
}

Avancemos y definamos algunas configuraciones estáticas globales, como el nombre de la transmisión, el asunto, el mensaje predeterminado y el servidor:

1
2
3
4
5
private static final String defaultStream = "pubsubasync-stream";
private static final String defaultSubject = "pubsubasync-subject";
private static final String defaultMessage = "Hello User";
private static final int defaultMessageCount = 2;
private static final String defaultServer = "nats://localhost:4222";

Los usaremos más adelante mientras configuramos las secuencias de forma programática, para evitar codificar variables en ellas.

Comencemos configurando una Conexión al servidor NATS JetStream, instanciando una instancia de JetStreamManagement, que se usa para agregar instancias de Stream, y una instancia de StreamConnfiguration - construida a través del Patrón de diseño de constructor para permitir flexibilidad al definir la configuración.

La conexión realizada con el servidor NATS puede fallar, por lo que querrá envolver *todo el código anterior en un bloque try-catch. Usaremos un bloque prueba-con-recursos ya que esta es una conexión que se puede cerrar, por lo que no tenemos que cerrarla manualmente:

1
2
3
4
5
try (Connection nc = Nats.connect(defaultServer)) {
    // Creating streams, managers, sending messages, subscribing, etc.
} catch (Exception e) {
    e.printStackTrace();
}

Dentro del bloque try, comenzaremos creando una instancia JetStreamManagement junto con un contexto StreamConfiguration y JetStream.

La clase JetStream es la API central del marco. JetStream indirectamente publica los mensajes a suscriptores enviando el mensaje a un asunto que los suscriptores están escuchando. También suscribe suscriptores a los temas.

Los sujetos se definen cuando se construye StreamConfiguration, y la instancia de JetStreamManagement nos permite agregar Streams con esa configuración a nuestra canalización. Cubriremos JetStreamManagement con más detalle en una sección posterior. Vamos a crear un flujo único para publicar mensajes a un asunto y crear el contexto JetStream para administrar la publicación y suscripción de los mensajes enviados a ese asunto:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream, here will use an in-memory storage type, and one subject
StreamConfiguration sc = StreamConfiguration.builder()
        .name(defaultStream)
        .storageType(StorageType.Memory)
        .subjects(defaultSubject)
        .build();
            
// Add a stream via the `JetStreamManagement` instance and capture its info in a `StreamInfo` object
StreamInfo streamInfo = jsm.addStream(sc);
JsonUtils.printFormatted(streamInfo);

// Create a JetStream context. This hangs off the original connection
// allowing us to produce data to publish into streams and consume data from
// JetStream consumers.
JetStream js = nc.jetStream();         

Ahora, podemos continuar y crear una lista de ‘Futuros’ para guardar los resultados de nuestros mensajes, ya que estamos tratando con mensajes asincrónicos y no sabemos cuándo volverán. Al publicar un mensaje a través del método publishAsync() de la instancia JetStream, se devuelve un PublishAck, que indica el reconocimiento futuro de la recepción por parte de un cliente.

If you'd like to read more about the Future interface, read our Guía de la interfaz del futuro en Java.

Además, para cada mensaje, crearemos una instancia Mensaje, que acepta un asunto y datos. A quién estamos enviando un mensaje y cuál es el mensaje. Usando el método NatsMessage.builder(), podemos crear fácilmente un mensaje que nos gustaría enviar y omitir ciertos argumentos para los que no tenemos ningún uso.

Una vez que se crea un Message, podemos publicarlo de forma asíncrona a través del método publishAsync() de JetStream:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Create a future for asynchronous message processing
List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
int stop = defaultMessageCount + 1;
for (int x = 1; x < stop; x++) {
    String data = defaultMessage + "-" + x;

    // Create a typical NATS message
    Message msg = NatsMessage.builder()
            .subject(defaultSubject)
            .data(data, StandardCharsets.UTF_8)
            .build();
    System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubject);

    // Publish a message and add the result to our `CompletableFuture` list
    futures.add(js.publishAsync(msg));
}

Una vez que enviemos los mensajes, es probable que queramos saber qué les sucedió y si surgieron problemas. Al iterar a través de nuestra lista de futuros, podemos verificar si las instancias CompletableFuture están terminadas, imprimiendo su contenido si lo están, y volviendo a ponerlas en cola si no lo están para volver a verificarlas más tarde:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Get Acknowledgement for the messages
while (futures.size() > 0) {
    CompletableFuture<PublishAck> f = futures.remove(0);
    if (f.isDone()) {
        try {
            PublishAck pa = f.get();
            System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
                    defaultSubject, pa.getStream(), pa.getSeqno());
        }
        catch (ExecutionException ee) {
            System.out.println("Publish Failed " + ee);
        }
    }
    else {
        // Re-queue it and try again
        futures.add(f);
    }
} 

Para que un editor publique (sensatamente), necesitaremos un suscriptor, no sea que los mensajes cuelguen sin mucho significado. Un suscriptor se crea como una instancia de JetStreamSubscription, devuelta por el método subscribe() del contexto JetStream:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Subscribe to the messages that have been published to the subject
JetStreamSubscription sub = js.subscribe(defaultSubject);
List<Message> messages = new ArrayList<>();
// Retrieve the next message and kick off an iteration of all the messages
Message msg = sub.nextMessage(Duration.ofSeconds(1));
boolean first = true;
while (msg != null) {
    if (first) {
        first = false;
        System.out.print("Read/Ack ->");
   }
   messages.add(msg);
   if (msg.isJetStream()) {
        msg.ack();
        System.out.print(" " + new String(msg.getData()) + "\n");                    
    }
    else if (msg.isStatusMessage()) {
            System.out.print(" !" + msg.getStatus().getCode() + "!");
    }
    JsonUtils.printFormatted(msg.metaData());
    msg = sub.nextMessage(Duration.ofSeconds(1));
}

// Make sure the message goes through before we close
// if you're not using the try-with-resources statement
nc.flush(Duration.ZERO);
nc.close();

Al unir todo esto, cuando ejecutamos el código, deberíamos ver mensajes como estos:

PubSubAsync

¡Hemos creado con éxito un Stream de datos, que lleva mensajes a un asunto y nuestros suscriptores los observan a medida que llegan de forma asíncrona! Sin embargo, a veces, los nombres de nuestros sujetos no se conocen antes de que queramos suscribirnos a ellos. Por ejemplo, puede generar nombres de materias y desea suscribirse a las nuevas materias a medida que se crean. O bien, hay una lista completa de temas con un prefijo común a los que desea suscribirse.

En ambos casos, en lugar de bucles intrincados y lógica de suscripción de generación, puede usar comodines para apuntar a más de un solo tema.

Transmisión comodín de publicador/suscriptor

NATS admite tokenización jerárquica para admitir la suscripción comodín. Como repaso del comienzo de la guía:

Un Asunto en NATS es simplemente una cadena que representa un interés en los datos. Está tokenizado jerárquicamente para admitir suscripciones comodín:

  • foo.* coincide con foo.bar y foo.baz
  • foo.*.bar coincide con foo.a.bar y foo.b.bar
  • foo.> coincide con cualquiera de los anteriores
  • > coincide con todo en NATS

Estos comodines se pueden configurar tanto en el editor como en el suscriptor o en ambos. Veremos un ejemplo típico de esto en un momento. La lógica detrás del enfoque que usaremos ahora es muy similar a la que hemos visto antes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class PubWildcardSubWildcard {

    private static final String defaultStream = "pubsubwildcardasync-stream";
    private static final String defaultSubjectWildcard = "audit.us.*";
    private static final String defaultSubjectSpecific = "audit.us.east";
    private static final String defaultMessage = "Audit User";
    private static final int defaultMessageCount = 2;
    private static final String defaultServer = "nats://localhost:4222";
    
    public static void main( String[] args ) {
        System.out.printf("\nPublishing to %s. Server is %s\n\n", defaultSubjectWildcard, defaultServer);
        
          try (Connection nc = Nats.connect(defaultServer)) {      
          JetStreamManagement jsm = nc.jetStreamManagement();
            
         StreamConfiguration sc = StreamConfiguration.builder()
                 .name(defaultStream)
                 .storageType(StorageType.Memory)
                 .subjects(defaultSubjectWildcard)
                 .build();

         StreamInfo streamInfo = jsm.addStream(sc);
         JsonUtils.printFormatted(streamInfo);
      
         JetStream js = nc.jetStream();            
      
         List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
         int stop = defaultMessageCount + 1;
         for (int x = 1; x < stop; x++) {
             String data = defaultMessage + "-" + x;

             Message msg = NatsMessage.builder()
                     .subject(defaultSubjectSpecific)
                     .data(data, StandardCharsets.UTF_8)
                     .build();
             System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubjectSpecific);
 
             futures.add(js.publishAsync(msg));
         }

         while (futures.size() > 0) {
             CompletableFuture<PublishAck> f = futures.remove(0);
             if (f.isDone()) {
                 try {
                     PublishAck pa = f.get();
                     System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
                            defaultSubjectSpecific, pa.getStream(), pa.getSeqno());
                 }
                 catch (ExecutionException ee) {
                     System.out.println("Publish Failed " + ee);
                 }
             }
             else {
                 futures.add(f);
             }
        }
            
         JetStreamSubscription sub = js.subscribe(defaultSubjectWildcard);
         List<Message> messages = new ArrayList<>();
         Message msg = sub.nextMessage(Duration.ofSeconds(1));
         boolean first = true;
         while (msg != null) {
             if (first) {
                 first = false;
                 System.out.print("Read/Ack ->");
             }
             messages.add(msg);
             if (msg.isJetStream()) {
                 msg.ack();
                 System.out.print(" " + new String(msg.getData()) + "\n");            
             }
             else if (msg.isStatusMessage()) {
                     System.out.print(" !" + msg.getStatus().getCode() + "!");
             }
             JsonUtils.printFormatted(msg.metaData());
             msg = sub.nextMessage(Duration.ofSeconds(1));
         }
         // Make sure the message goes through before we close
         // if you're not using the try-with-resources statement
          nc.flush(Duration.ZERO)
          nc.close();
     }
     catch (Exception e) {
         e.printStackTrace();
     }
}
}

Cuando ejecutemos este código, seremos recibidos con:

PubSubWildcardLog

Como alternativas al modelo Pub/Sub, usando msg.getReplyTo(), podemos comenzar a crear una implementación de patrón Solicitud-Respuesta, y al crear grupos de cola y canales para suscribirse y darse de baja, podemos crear un *Grupo de cola * implementación de patrones.

Esto es posible porque no hemos realizado ninguna configuración específica de patrón para NATS - los patrones específicos que le gustaría usar dependen únicamente de cómo use la biblioteca.

Gestión de JetStream {#gestión de JetStream}

En cierto punto, es probable que desee observar o administrar sus flujos. Para hacer eso, vamos a analizar el ciclo de vida de la transmisión en NATS JetStream:

  • Crear o agregar una secuencia con un tema
  • Actualice una secuencia agregando un asunto
  • Obtener información sobre corrientes
  • Purgar un flujo de sus mensajes
  • Eliminar una secuencia

Para demostrar esto, vamos a crear una clase con algunos campos estáticos y solo un método main(). Dentro de él, probaremos algunas de estas operaciones, pero en función de su arquitectura y activadores para estas operaciones, querrá adjuntar los segmentos de código correspondientes en consecuencia:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NatsJsManageStreams {

    private static final String STREAM1 = "manage-stream1";
    private static final String STREAM2 = "manage-stream2";
    private static final String SUBJECT1 = "manage-subject1";
    private static final String SUBJECT2 = "manage-subject2";
    private static final String SUBJECT3 = "manage-subject3";
    private static final String SUBJECT4 = "manage-subject4";
    private static final String defaultServer = "nats://localhost:4222";

    public static void main(String[] args) {
        try (Connection nc = Nats.connect(defaultServer)) {
            JetStreamManagement jsm = nc.jetStreamManagement();
            // Management code
            // ...
          
          // Make sure the message goes through before we close
          // if you're not using the try-with-resources statement
            nc.flush(Duration.ZERO);
            nc.close();
        } catch (Exception exp) {
            exp.printStackTrace();
        }
    }
}

Usaremos la misma instancia de JetStreamManagement para el resto de las muestras, ya que las estamos usando todas en una sola clase. Sin embargo, tenga en cuenta que en un escenario del mundo real, nunca/rara vez crearía una configuración de transmisión múltiple. En su lugar, normalmente agregaría temas a un flujo existente para reutilizar los recursos.

Nota: A lo largo de los ejemplos, usaremos una Clase de utilidad personalizada para manejar la creación o actualización de un Stream, publicar de forma asíncrona sin esperar, o para leer un mensaje con o sin Reconocimiento - NatsJsUtils . Esta clase de utilidad se puede encontrar en GitHub.

Crear o agregar una transmisión con un tema {#crear o agregar una transmisión con un tema}

La primera vez que creamos un Stream, solo establecimos su nombre, asunto y política de almacenamiento. Hay varias otras configuraciones que podemos modificar a través de los métodos de construcción:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 1. Create (add) a stream with a subject
System.out.println("\n----------\n1. Configure And Add Stream 1");
StreamConfiguration streamConfig = StreamConfiguration.builder()
        .name(STREAM1)
        .subjects(SUBJECT1)
        // .retentionPolicy()
        // .maxConsumers(...)
        // .maxBytes(...)
        // .maxAge(...)
        // .maxMsgSize(...)
         .storageType(StorageType.Memory)
        // .replicas(...)
        // .noAck(...)
        // .template(...)
        // .discardPolicy(...)
        .build();
StreamInfo streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

La RetentionPolicy establece cuándo se eliminan los mensajes: cuándo ya no hay interés en ellos (ningún consumidor los consumirá), cuándo se consumen, etc. Puede limitar la cantidad de consumidores, cuánto tiempo puede durar el mensaje. estar en bytes, cuánto tiempo puede persistir, si se requiere o no una respuesta ‘ACK’, etc.

En la forma más simple: proporciona un nombre, asunto y tipo de almacenamiento, y compila(). Podemos obtener la información de un ‘Stream’ como un tipo de retorno del método ‘addStream()’ de la instancia ‘JetStreamManagement’, que está bastante impreso a través de la clase ‘NatsJsUtils’:

Actualización de una secuencia con un asunto

Puede actualizar los flujos existentes mediante el método updateStream() de la instancia JetStreamManagement. Reutilizaremos la variable de referencia streamConfig y build() una nueva configuración para el flujo que nos gustaría actualizar, en función de la configuración extraída de la instancia StreamInfo existente:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 2. Update stream, in this case, adding a new subject
// -  StreamConfiguration is immutable once created
// -  but the builder can help with that.
System.out.println("----------\n2. Update Stream 1");
streamConfig = StreamConfiguration.builder(streamInfo.getConfiguration())
        .addSubjects(SUBJECT2).build();
streamInfo = jsm.updateStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

// 3. Create (add) another stream with 2 subjects
System.out.println("----------\n3. Configure And Add Stream 2");
streamConfig = StreamConfiguration.builder()
        .name(STREAM2)
        .storageType(StorageType.Memory)
        .subjects(SUBJECT3, SUBJECT4)
        .build();
streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

Esto resulta en:

Obtener información sobre flujos

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 4. Get information on streams
// 4.0 publish some message for more interesting stream state information
// -   SUBJECT1 is associated with STREAM1
// 4.1 getStreamInfo on a specific stream
// 4.2 get a list of all streams
// 4.3 get a list of StreamInfo's for all streams
System.out.println("----------\n4.1 getStreamInfo");
NatsJsUtils.publish(nc, SUBJECT1, 5);
streamInfo = jsm.getStreamInfo(STREAM1);
NatsJsUtils.printStreamInfo(streamInfo);

System.out.println("----------\n4.2 getStreamNames");
List<String> streamNames = jsm.getStreamNames();
NatsJsUtils.printObject(streamNames);

System.out.println("----------\n4.2 getStreamNames");
List<StreamInfo> streamInfos = jsm.getStreams();
NatsJsUtils.printStreamInfoList(streamInfos);

Purgar un flujo

Puede purgar fácilmente una transmisión de todos sus mensajes, vaciándola por completo:

1
2
3
4
// 5. Purge a stream of it's messages
System.out.println("----------\n5. Purge stream");
PurgeResponse purgeResponse = jsm.purgeStream(STREAM1);
NatsJsUtils.printObject(purgeResponse);

Eliminación de una secuencia

O, si definitivamente ha terminado con una transmisión, puede eliminarla fácilmente:

1
2
3
4
// 6. Delete a stream
System.out.println("----------\n6. Delete stream");
jsm.deleteStream(STREAM2);
System.out.println("----------\n");

Gestión de la seguridad

NATS JetStream admite el cifrado de conexiones con TLS. TLS se puede utilizar para cifrar/descifrar el tráfico entre la conexión cliente/servidor y comprobar la identidad del servidor. Cuando está habilitado en modo TLS, NATS requerirá que todos los clientes se conecten con TLS.

Puede definir un SSLContext cargando todos los Keystores y Truststores y luego sobrecargar SSLContext como una opción mientras se conecta a NATS. Definamos una clase SSLUtils que podamos usar para cargar un almacén de claves, crear administradores de claves y un contexto SSL:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class SSLUtils {
    public static String KEYSTORE_PATH = "keystore.jks";
    public static String TRUSTSTORE_PATH = "truststore.jks";
    public static String STORE_PASSWORD = "password";
    public static String KEY_PASSWORD = "password";
    public static String ALGORITHM = "SunX509";

    public static KeyStore loadKeystore(String path) throws Exception {
        KeyStore store = KeyStore.getInstance("JKS");
        BufferedInputStream in = new BufferedInputStream(new FileInputStream(path));

        try {
            store.load(in, STORE_PASSWORD.toCharArray());
        } finally {
            if (in != null) {
                in.close();
            }
        }

        return store;
    }

    public static KeyManager[] createTestKeyManagers() throws Exception {
        KeyStore store = loadKeystore(KEYSTORE_PATH);
        KeyManagerFactory factory = KeyManagerFactory.getInstance(ALGORITHM);
        factory.init(store, KEY_PASSWORD.toCharArray());
        return factory.getKeyManagers();
    }

    public static TrustManager[] createTestTrustManagers() throws Exception {
        KeyStore store = loadKeystore(TRUSTSTORE_PATH);
        TrustManagerFactory factory = TrustManagerFactory.getInstance(ALGORITHM);
        factory.init(store);
        return factory.getTrustManagers();
    }

    public static SSLContext createSSLContext() throws Exception {
        SSLContext ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
        ctx.init(createTestKeyManagers(), createTestTrustManagers(), new SecureRandom());
        return ctx;
    }
}

Luego, con nuestra clase de utilidad preparada, podemos proporcionar el SSLContext creado por ella al método constructor sslContext() al crear una conexión NATS:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class NatsConnectTLS {
    public static void main(String[] args) {
        try {
            SSLContext ctx = SSLUtils.createSSLContext();
            Options options = new Options.Builder()
                                .server("nats://localhost:4222")
                                .sslContext(ctx) // Set the SSL context
                                .build();
            Connection nc = Nats.connect(options);

            // Do something with the connection

            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

También podemos definir un mecanismo de autenticación para restringir el acceso al sistema NATS. El cliente no tiene control sobre los controles de acceso, pero los clientes proporcionan las configuraciones necesarias para autenticarse con el sistema, vincularse a una cuenta y requerir TLS.

Se puede establecer una configuración simple para conectarse con un nombre de usuario y una contraseña a través del método userInfo() cuando se configuran las Opciones:

1
2
3
4
5
Options options = new Options.Builder().
                            .server("nats://localhost:4222")
                            .userInfo("myname","password") // Set a user and plain text password
                            .build();
Connection nc = Nats.connect(options);

Luego, al crear una conexión, podemos conectarnos al servidor NATS proporcionando el nombre de usuario y la contraseña en la URL:

1
Connection nc = Nats.connect("nats://myname:[correo electrónico protegido]:4222");

Del mismo modo, también podemos pasar tokens de autenticación, como JWT, o secretos como parte de la siguiente configuración:

1
2
3
4
5
Options options = new Options.Builder()
                            .server("nats://localhost:4222")
                            .token("mytoken") // Set a token
                            .build();
Connection nc = Nats.connect(options);

Ahora podemos conectarnos a NATS Url como se muestra a continuación:

1
Connection nc = Nats.connect("nats://[correo electrónico protegido]:4222"); // Token in URL

Conclusión

Cuando piense en usar un sistema de transmisión distribuida como el sistema nervioso para construir clústeres basados ​​en microservicios distribuidos, sistemas basados ​​en IoT, sistemas Edge de próxima generación, puede considerar usar NATS JetStream como una opción liviana en comparación con otros marcos populares y poderosos como como Apache Kafka. Tratar con un volumen masivo de flujos de eventos y mensajes es cada vez más común en un mundo basado en datos. NATS JetStream proporciona las capacidades de seguridad distribuida, tenencia múltiple y escalado horizontal.

Como siempre, puede encontrar el código fuente completo en GitHub.