Tutorial del reactor de resorte

En este artículo, nos presentarán el proyecto Spring Reactor y su importancia. La idea es aprovechar la especificación de flujos reactivos para...

Visión general

En este artículo, nos presentarán el proyecto Reactor de resort y su importancia. La idea es aprovechar la Especificación de flujos reactivos para construir aplicaciones reactivas sin bloqueo en la JVM.

Usando este conocimiento, construiremos una aplicación reactiva simple y la compararemos con una aplicación de bloqueo tradicional.

Las aplicaciones reactivas son la "nueva novedad", lo que hace que muchas aplicaciones cambien a este modelo. Puedes leer más sobre esto en El Manifiesto Reactivo.

Motivación

Las API convencionales están bloqueando

Las aplicaciones modernas manejan una gran cantidad de usuarios y datos simultáneos. ley de moore ya no se sostiene como antes. Las capacidades del hardware, aunque van en aumento, no están a la altura de las aplicaciones modernas en las que el rendimiento es muy importante.

Los desarrolladores de Java por defecto escriben código de bloqueo. Así es como se configuró la API. Otro ejemplo sería el enfoque tradicional de servlet (Gato). Cada solicitud garantiza un nuevo hilo que espera a que finalice todo el proceso en segundo plano para enviar la respuesta.

Esto significa que la lógica de nuestra capa de datos está bloqueando la aplicación de forma predeterminada, ya que los Subprocesos esperan ociosamente una respuesta. Es un desperdicio no reutilizar estos Subprocesos para un propósito diferente, mientras esperamos la respuesta.

Motivación del reactor{.img-responsive}[Crédito: [http://proyectoreactor.io/aprender](http://projectreactor. io/aprender)]{.pequeño}

Nota: Esto podría ser un problema si tenemos recursos limitados o si un proceso tarda demasiado en ejecutarse.

Bloques fijos asíncronos

En Java, puede escribir código de forma asíncrona usando devoluciones de llamada y Futuros. Luego puede obtener y unir subprocesos en algún momento posterior y procesar el resultado. Java 8 nos presentó una nueva clase: CompletableFuturo, que hace que sea mucho más fácil coordinar estos cosas.

Funciona de manera simple: cuando termina un solo proceso, comienza otro. Después de que finaliza el segundo, los resultados se combinan en un tercer proceso.

Esto hace que sea mucho más fácil coordinar su aplicación, pero en última instancia sigue bloqueando, ya que crea subprocesos y espera a que se llame a un método .join().

Motivación del reactor{.img-responsive}[Crédito: [http://proyectoreactor.io/aprender](http://projectreactor.io/ aprender)]{.pequeño}

Programación reactiva {#programación reactiva}

Lo que queremos es asincrónico y sin bloqueo. Un grupo de desarrolladores de empresas como Netflix, Pivotal, RedHat, etc. se juntaron y convergieron en algo llamado La reflexión de flujos reactivos.

Project Reactor es la implementación de Spring de The Reactive Specification y está específicamente favorecido por el [Flujo web de primavera](https://docs.spring.io/spring/docs/current/spring-framework-reference/ web-reactive.html), aunque puedes usarlo con otros módulos como RxJava.

La idea es operar Asíncronamente con Retropresión usando Editores y Suscriptores.

¡Aquí, estamos siendo introducidos a varios conceptos nuevos! Vamos a explicarlos uno por uno:

  • Editor - Un Editor es un proveedor de un número potencialmente ilimitado de elementos.
  • Suscriptor - Un Suscriptor escucha a ese Publicador, solicitando nuevos datos. A veces, también se le conoce como Consumidor.
  • Contrapresión: la capacidad del Suscriptor para informar al Editor cuántas solicitudes puede manejar en ese momento. Por lo tanto, es el Suscriptor el responsable del flujo de datos, no el Editor, ya que solo proporciona los datos.

The Reactor Project ofrece 2 tipos de editores. Estos se consideran los componentes principales de Spring Webflux:

  • Flux - es un editor que produce valores de ‘0’ a ‘N’. Podría ser ilimitado. Las operaciones que devuelven varios elementos utilizan este tipo.
  • Mono - es un editor que produce valores de ‘0’ a ‘1’. Las operaciones que devuelven un único elemento utilizan este tipo.

Desarrollo de aplicaciones reactivas

Con todo lo anterior en mente, ¡pasemos a crear una aplicación web simple y aprovechemos este nuevo paradigma reactivo!

La forma más sencilla de comenzar con un proyecto básico de Spring Boot, como siempre, es usar Spring Initializr. Seleccione su versión preferida de Spring Boot y agregue la dependencia "Reactive Web". Después de esto, genere como un proyecto Maven y ¡ya está todo listo!

{.img-responsive}

Definamos un POJO simple - Saludo:

1
2
3
4
public class Greeting {
    private String msg;
    // Constructors, getters and setters
}

Definición de un editor

Junto a él, definamos un controlador REST simple con un mapeo adecuado:

1
2
3
4
5
6
7
8
@RestController
public class GreetReactiveController {
    @GetMapping("/greetings")
    public Publisher<Greeting> greetingPublisher() {
        Flux<Greeting> greetingFlux = Flux.<Greeting>generate(sink -> sink.next(new Greeting("Hello"))).take(50);
        return greetingFlux;
    }
}

Llamar a flujo.generar() creará un nunca secuencia final del objeto ‘Saludo’.

El método tomar(), como sugiere el nombre, solo tomará los primeros 50 valores de la corriente.

Es importante tener en cuenta que el tipo de retorno del método es el tipo asíncrono Publisher<Greeting>.

Para probar este punto final, navegue su navegador a http://localhost:8080/saludos o use el rizo cliente en su línea de comando - curl localhost:8080/saludos

Se le solicitará una respuesta similar a:

{.img-responsive}

Esto no parece tan importante y podríamos haber devuelto simplemente una Lista<Saludo> para lograr el mismo resultado visual.

Pero de nuevo, observe que estamos devolviendo un Flux<Greeting>, que es un tipo asincrónico ya que cambia todo.

Supongamos que tuviéramos una editorial que devolviera más de mil registros, o incluso más. Piense en lo que tiene que hacer el marco. Se le da un objeto de tipo Saludo, que tiene que convertir a JSON para el usuario final.

Si hubiéramos utilizado el enfoque tradicional con Spring MVC, estos objetos seguirían acumulándose en su RAM y una vez que recopila todo, lo devolvería al cliente. Esto podría exceder nuestra capacidad de RAM y también bloquear cualquier otra operación para que no se procese mientras tanto.

Cuando usamos Spring Webflux, toda la dinámica interna cambia. El marco comienza a suscribirse a estos registros del editor y serializa cada elemento y lo envía de vuelta al cliente en fragmentos.

Hacemos las cosas de forma asíncrona sin crear demasiados hilos y reutilizando los hilos que están esperando algo. La mejor parte es que no tienes que hacer nada extra para esto. En Spring MVC tradicional, podríamos lograr lo mismo devolviendo AsyncResult, DefferedResult, etc. para obtener cierta asincronía, pero internamente Spring MVC tuvo que crear un nuevo subproceso, que se bloquea porque tiene que esperar.

Eventos enviados por el servidor

Otro editor que se ha utilizado desde su llegada es Eventos enviados por el servidor.

Estos eventos permiten que una página web obtenga actualizaciones de un servidor en tiempo real.

Definamos un servidor reactivo simple:

1
2
3
4
5
6
7
@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Greeting> sseGreetings() {
    Flux<Greeting> delayElements = Flux
            .<Greeting>generate(sink -> sink.next(new Greeting("Hello @" + Instant.now().toString())))
            .delayElements(Duration.ofSeconds(1));
    return delayElements;
}

Alternativamente, podríamos haber definido esto:

1
2
3
4
5
6
@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Greeting> events() {
    Flux<Greeting> greetingFlux = Flux.fromStream(Stream.generate(() -> new Greeting("Hello @" + Instant.now().toString())));
    Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
    return Flux.zip(greetingFlux, durationFlux).map(Tuple2::getT1);
}

Estos métodos producen un TEXT_EVENT_STREAM_VALUE que esencialmente significa que los datos se envían en forma de eventos enviados por el servidor.

Tenga en cuenta que en el primer ejemplo, estamos usando un Publisher y en el segundo ejemplo estamos usando un Flux. Una pregunta válida sería:

"¿Qué tipo de devolución debo usar entonces?"

Se recomienda usar Flux y Mono sobre Publisher. Ambas clases son implementaciones de la interfaz Publisher que se originan en Reactive Streams. Si bien puede usarlos indistintamente, es más expresivo y descriptivo usar las implementaciones.

Estos dos ejemplos destacan dos formas de crear eventos enviados por el servidor retrasados:

  • .delayElements()- Este método retrasa cada elemento del flujo por la duración dada
  • .zip() - Estamos definiendo un flujo para generar eventos y un flujo para generar valores cada segundo. Al unirlos, obtenemos un flujo que genera eventos cada segundo.

Vaya a http://localhost:8080/saludos/sse o use un cliente rizo en su línea de comando y verás una respuesta que se parece a:

{.img-responsive}

Definición de un consumidor

Ahora veamos el lado del consumidor. Vale la pena señalar que no necesita tener un editor reactivo para usar la programación reactiva en el lado consumidor:

1
2
3
4
5
public class Person {
    private int id;
    private String name;
    // Constructor with getters and setters
}

Y luego tenemos un ‘RestController’ tradicional con un solo mapeo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@RestController
public class PersonController {
    private static List<Person> personList = new ArrayList<>();
    static {
        personList.add(new Person(1, "John"));
        personList.add(new Person(2, "Jane"));
        personList.add(new Person(3, "Max"));
        personList.add(new Person(4, "Alex"));
        personList.add(new Person(5, "Aloy"));
        personList.add(new Person(6, "Sarah"));
    }

    @GetMapping("/person/{id}")
    public Person getPerson(@PathVariable int id, @RequestParam(defaultValue = "2") int delay)
            throws InterruptedException {
        Thread.sleep(delay * 1000);
        return personList.stream().filter((person) -> person.getId() == id).findFirst().get();
    }
}

Inicializamos una lista de tipo ‘Persona’ y, según el ‘id’ pasado a nuestro mapeo, filtramos a esa persona usando una secuencia.

Es posible que se alarme por el uso de Thread.sleep() aquí, aunque solo se usa para simular un retraso de red de 2 segundos.

Si está interesado en leer más sobre Flujos de Java, ¡lo tenemos cubierto!

Avancemos y creemos nuestro consumidor. Al igual que el editor, podemos hacer esto fácilmente usando Spring Initializr:

{.img-responsive}

Nuestra aplicación de productor se ejecuta en el puerto 8080. Ahora digamos que queremos llamar al punto final /person/{id} 5 veces. Sabemos que, de forma predeterminada, cada respuesta tiene un retraso de 2 segundos debido al "retraso de la red".

Primero hagamos esto usando el enfoque tradicional RestTemplate:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CallPersonUsingRestTemplate {

    private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingRestTemplate.class);
    private static RestTemplate restTemplate = new RestTemplate();

    static {
        String baseUrl = "http://localhost:8080";
        restTemplate.setUriTemplateHandler(new DefaultUriBuilderFactory(baseUrl));
    }

    public static void main(String[] args) {
        Instant start = Instant.now();

        for (int i = 1; i <= 5; i++) {
            restTemplate.getForObject("/person/{id}", Person.class, i);
        }

        logTime(start);
    }

    private static void logTime(Instant start) {
        logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
    }
}

Vamos a ejecutarlo:

{.img-responsive}

Como era de esperar, tomó un poco más de 10 segundos y así es como Spring MVC funciona de manera predeterminada.

Hoy en día, esperar un poco más de 10 segundos para obtener un resultado en una página es inaceptable. Esta es la diferencia entre conservar un cliente/cliente y perderlo por esperar demasiado.

Spring Reactor presentó un nuevo cliente web para realizar solicitudes web llamado cliente web. Comparado con RestTemplate, este cliente tiene una sensación más funcional y es completamente reactivo. Está incluido en la dependencia spring-boot-starter-weblux y está diseñado para reemplazar RestTemplate sin bloqueos.

Reescribamos el mismo controlador, esta vez, usando WebClient:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CallPersonUsingWebClient_Step1 {

    private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingWebClient_Step1.class);
    private static String baseUrl = "http://localhost:8080";
    private static WebClient client = WebClient.create(baseUrl);

    public static void main(String[] args) {

        Instant start = Instant.now();

        for (int i = 1; i <= 5; i++) {
            client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class);
        }

        logTime(start);
    }

    private static void logTime(Instant start) {
        logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
    }

}

Aquí, creamos un WebClient pasando baseUrl. Luego, en el método principal, simplemente llamamos al punto final.

get() indica que estamos haciendo una solicitud GET. Sabemos que la respuesta será un solo objeto, por lo que estamos usando un Mono como se explicó antes.

En última instancia, le pedimos a Spring que asignara la respuesta a una clase Persona:

{.img-responsive}

Y no pasó nada, como era de esperar.

Esto se debe a que no nos estamos suscribiendo. Todo el asunto está aplazado. Es asíncrono pero tampoco se inicia hasta que llamamos al método .subscribe(). Este es un problema común con las personas que son nuevas en Spring Reactor, así que esté atento a esto.

Cambiemos nuestro método principal y agreguemos suscribirse:

1
2
3
for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).subscribe();
}

Agregar el método nos solicita el resultado deseado:

{.img-responsive}

La solicitud se envía pero el método .subscribe() no se sienta y espera la respuesta. Como no bloquea, terminó antes de recibir la respuesta.

¿Podríamos contrarrestar esto encadenando .block() al final de las llamadas al método?

1
2
3
for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).block();
}

Resultado:

{.img-responsive}

Obtuvimos la respuesta esta vez para cada persona, aunque tomó más de 10 segundos. Esto anula el propósito de que la aplicación sea reactiva.

La forma de solucionar todos estos problemas es simple: hacemos una lista de tipo Mono y esperamos a que se completen todos, en lugar de esperar a cada uno:

1
2
3
4
5
List<Mono<Person>> list = Stream.of(1, 2, 3, 4, 5)
    .map(i -> client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class))
    .collect(Collectors.toList());

Mono.when(list).block();

Resultado:

{.img-responsive}

Esto es lo que estamos buscando. Esta vez, tomó poco más de dos segundos, incluso con un retraso masivo en la red. Esto aumenta drásticamente la eficiencia de nuestra aplicación y realmente cambia las reglas del juego.

Si observa detenidamente los hilos, Reactor los está reutilizando en lugar de crear otros nuevos. Esto es realmente importante si su aplicación maneja muchas solicitudes en un corto período de tiempo.

Conclusión

En este artículo, discutimos la necesidad de la programación reactiva y la implementación de Spring: Spring Reactor.

Luego, discutimos el módulo Spring Webflux, que usa Reactor internamente, así como conceptos cubiertos como Publisher y Subscriber. A partir de esto, creamos una aplicación que publica datos como un flujo reactivo y los consume en otra aplicación.

El código fuente de este tutorial se puede encontrar en Github.