Java 8 Streams: Guía definitiva para la transmisión en paralelo con Parallel()

En esta guía, aprenda cómo hacer que los flujos de Java 8 se ejecuten en paralelo con el método parallel(), así como las mejores prácticas y los entresijos de la paralelización de flujos con ejemplos prácticos de código.

Flujos paralelos en Java 8

Hoy en día, Java Streams API se usa ampliamente, lo que hace que Java sea más funcional que nunca. Por lo tanto, han surgido modelos como Mapa reducido para facilitar el manejo de flujos.

Aunque estos modelos hicieron que el uso de flujos fuera sencillo, también han introducido problemas de eficiencia. La operación integrada parallel() es bastante simple de implementar y le permite aprovechar el paralelismo.

Por ejemplo, digamos que necesita sumar todos los números entre 1 y 1,000,000. El fragmento de código a continuación calcula el resultado al procesar cada número en el rango en un orden en serie:

1
2
3
int sum = Stream.iterate(1, a -> a +1)
    .limit(1_000_000)
    .reduce(0, (a, b) -> a + b );

Podríamos reducir su tiempo de ejecución con una simple adición de parallel(). El cálculo ahora se lleva a cabo en varios subprocesos paralelos:

1
2
3
4
int sum = Stream.iterate(1, a -> a +1)
    .limit(1_000_000)
    .parallel()
    .reduce(0, (a, b) -> a + b );

Eso es todo en términos del uso del método: simplemente haces una secuencia parallel() antes de otras operaciones. Sin embargo, las apariencias pueden ser engañosas. Por un lado, las malas elecciones de diseño de código hacen que el paralelismo funcione más lento que secuencialmente.

En este ejemplo, llamar a Stream.iterate() es caro. Se produce mucho boxeo y desboxeo debido a eso. Por lo tanto, una secuencia con muchos elementos sufrirá un impacto en el rendimiento debido a esto.

Además, las lambdas que producen efectos secundarios hacen que los flujos paralelos sean peligrosos para la seguridad de los subprocesos. Hemos reflexionado extensamente sobre estas consideraciones en otras guías dedicadas a la API funcional en Java, en función de las operaciones y sus implementaciones específicas.

  • En Guía para reducir() - Encontramos que una simple llamada parallel() podría reducir el tiempo de ejecución de los cálculos. Esto se debe a que reduce() aplica el patrón divide y vencerás. Y, el paralelismo es una excelente ayuda para ello.

  • En Guía para findFirst() y findAny() - Cortocircuitar un flujo usando findAny() demostró ser eficiente cuando corría en paralelo.

Aún así, todas estas exploraciones no exploraron qué implicaba realmente llamar a parallel(). Por ejemplo, al usar parallel() nunca nos preguntamos:

  • Con el paralelismo, ¿creamos también concurrencia?
  • ¿Funciona el subprocesamiento múltiple, que crea parallel(), para bloquear rutinas? ¿Podrían estas llamadas hacer que las operaciones de IO sean más rápidas, por ejemplo?

Esta guía pretende dar respuesta a todas estas preguntas.

Paralelización en Java heredado

Digamos que quieres encontrar la suma de todos los números entre 1 y n, donde n = 1000. Usando el ciclo for clásico, harías algo como esto:

1
2
3
4
5
6
7
8
private long result = 0;

public long sumUsingClassicForLoop(long n) {
    for(long i = 1L; i <= n; i++) {
        result += i;
    }
    return result;
}

Y al ejecutar este fragmento de código, obtendría el resultado:

1
500500

Concedido, el resultado es el esperado. Sin embargo, ¿qué sucede si su valor n es mucho mayor? Diga, n = 1000000. ¿No le gustaría que su código se ejecutara de manera eficiente y aprovechara al máximo los muchos núcleos del procesador de su computadora?

Un enfoque natural sería utilizar muchos subprocesos para calcular el resultado. Sin embargo, antes de Java 8, la única opción era crear subprocesos manualmente o usar Executor Framework con un grupo de subprocesos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private long result = 0;

public long sumUsingThreading(long n) {
    Runtime r = Runtime.getRuntime();
    int procs = r.getAvailableProcessors();
    ExecutorService es = newFixedThreadPool(procs);
    
    try {
        for(long i = 1L; i <= n; i++) {
            // Notice how we do not use variable i in the lambda directly
            // That is because i would need to be effectively final to be used
            // inside a lambda
            long toAdd = i;
            es.execute(() -> result += toAdd);
        }
    } catch (Exception e) {
        System.out.println("An error occured");
    } finally {
        es.shutdown();
    }    
    return result;
}

{.icon aria-hidden=“true”}

Nota: Al usar ExecutorService, en realidad hemos simplificado el uso de subprocesos. Al menos, el código del cliente no declara sus propias variables de hilo. De lo contrario, hay una gestión de subprocesos involucrada, que podría necesitar bifurcaciones y uniones de subprocesos; en resumen, agrupación de subprocesos. Si hubiéramos elegido ir por ese camino, el método sumUsingThreading() se habría vuelto complejo. Habría incluido mecanismos de bloqueo de hilos, por ejemplo.

Con esto, obtendrías un resultado como el del enfoque del bucle for. Pero, para un cálculo tan simple, las líneas de código son demasiadas, lo que perjudica la legibilidad. Había una necesidad obvia de hacer este proceso más simple y menos detallado.

Por el contrario, al usar parallel(), no es necesario agregar ningún tipo de subprocesamiento en el código del cliente. La API de Streams hace todo eso por usted en segundo plano. Mira, obtendrías los mismos resultados que en el ejemplo anterior al escribir esto:

1
2
3
4
5
6
public long sumUsingParallel(long n) {
    return Stream.iterate(1L, a -> a + 1)
        .limit(n)
        .parallel()
        .reduce(0L, Long::sum);        
}

¡Observe cómo con el enfoque parallel() hemos reducido las líneas de código del 23 del ejemplo anterior a 6! Además, la mejora en la legibilidad del código es drástica.

Definiciones

Firma del método:

1
S parallel()

Este es el método que es más probable que utilice cuando haya creado una secuencia usted mismo. Y por “usted mismo”, significa que ha iniciado una transmisión, como:

1
2
Stream<Integer> myStream = Stream.of(1, 2, 3);
myStream.parallel().close();

En otros casos de uso, podría tener una colección secuencial que desee paralelizar. Y para eso, la API de colecciones también proporciona una forma de crear flujos paralelos.

Ofrece el método parallelStream():

1
Stream<E> parallelStream()

Que usarías como:

1
2
Collection<Integer> numbers = Arrays.asList(1, 2, 3);
numbers.parallelStream().close();

Sin embargo, tenga en cuenta que parallelStream() es solo un atajo para:

1
numbers.stream().parallel().close();

La interfaz BaseStream define un método parallel() como uno que:

"Devuelve una secuencia equivalente que es paralela. Puede devolverse a sí misma, ya sea porque la secuencia ya era paralela o porque el estado de la secuencia subyacente se modificó para que sea paralelo."

[Documentación oficial de BaseStream]{.small}

Además, la API de Streams proporciona una [forma de interrogar si un flujo se reduce en paralelo](https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#isParallel –). El método isParallel() devuelve un valor booleano, que le indica si un flujo se ejecutaría en paralelo si se ejecuta una operación de terminal.

Poner isParallel() en acción: comprobar si la transmisión está en paralelo

Algunas operaciones de terminal, como forEach(), no se preocupan por el orden de encuentro de un flujo. Además, forEach() es explícitamente no determinista.

Al ser explícitamente no determinista, forEach() no promete respetar el orden de encuentro de un flujo. Porque al hacerlo, no aprovecharía el paralelismo.

Pero, ¿qué sucede si tiene un caso de uso en el que un orden de encuentro en serie es crucial? ¿Dónde las operaciones paralelas estropearían la salida deseada, por ejemplo?

Digamos que desea imprimir una secuencia de registros en la consola, por ejemplo. Y su código presenta el método printToConsole():

1
2
3
public void printToConsole(Stream<String> logs) {
    logs.forEach(System.out::println);
}

Aquí, el uso de forEach() puede hacer que su código imprima registros en el orden incorrecto, ya que no serían consistentes. Y, dado que es posible que usted no sea el autor del flujo de logs, es posible que no sepa si es paralelo o no.

La operación isParallel() es muy útil en este escenario. Le informará sobre el orden de encuentro de la corriente. Si devuelve true, significaría que se trata de un flujo paralelizado. Y, falso si es secuencial.

Estos informes lo ayudarán a modificar su código. Por lo tanto, según nuestro caso, nos aseguraríamos de imprimir los registros en el orden correcto cambiando printToConsole() a:

1
2
3
4
5
6
7
public void printToConsole(Stream<String> logs) {
    if(logs.isParallel()) {
        logs.forEachOrdered(System.out::println);
    } else {
        logs.forEach(System.out::println);
    }    
}

Cuando tenga una operación de terminal que requiera un pedido en serie, use isParallel(). Le ayudará a determinar qué variante de forEach() usar.

En resumen, elige:

  • forEachOrdered(): para encontrar elementos de cualquier flujo en un orden en serie.
  • forEach(): para encontrar elementos de:
    • A serial stream when you care about the order
    • A parallel stream when you are not concerned about the order

Cómo funcionan los flujos paralelos - Inmersión más profunda

"El paralelismo consiste en hacer muchas cosas a la vez"

-- Robar Pike

En Java, el paralelismo consta de varias fases:

  • Una rutina dada descompone una tarea en sus tareas constituyentes
  • Cada tarea se adjunta a un hilo distinto
  • Otra rutina calcula los resultados de cada subtarea
  • Luego, otra rutina recopila los resultados de cada tarea en un resultado agregado

Sin embargo, fue posible ejecutar esta secuencia de actividades incluso en versiones heredadas de Java.

A partir de Java 5, por ejemplo, las nuevas implementaciones de ExecutorService simplificaron el paralelismo.

Luego, en Java 7, la introducción de ForkJoinPool simplificó aún más el paralelismo. La clase es una implementación concreta de ExecutorService. Y amplió la interfaz al agregar el aspecto de trabajo robando, configurando así el paralelismo para una mayor eficiencia. Con ForkJoinPool, las tareas inactivas tienen como objetivo aliviar las tareas ocupadas de parte de su carga.

A partir de Java 8, el aspecto de los flujos también ha hecho que el paralelismo sea idiomático.

Streams' parallel() llama a ForkJoinPool. Y lo hacen de una manera funcional también. Con Java funcional, sus partes internas ejecutan el cómo del paralelismo. Mientras dejan el código del cliente para declarar qué desea paralelizar.

Como ilustración, comparemos cómo funcionan dos fragmentos de código.

El primero usa el método sumUsingSequential(). Éste calcula la suma de todos los números entre 1 y 10. Lo hace en un orden de serie. Un número más el siguiente. Luego, el resultado más el siguiente número — el clásico patrón reduce() de plegable.

If you'd like to read more about reducing in Java - read our Java 8 Streams: Guía para reducir()!

Hemos mantenido el rango pequeño para que podamos examinar cada paso de la rutina de ese método.

Luego, el segundo método sumUsingParallel() también calcula la suma de esos números. Pero, lo hace usando subprocesos que se ejecutan en paralelo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public long sumUsingSequential() {
    return LongStream.rangeClosed(1L, 10L)
        .peek(this::printThreadName)
        .reduce(0L, this::printSum);
}

public void printThreadName(long l) {
    String tName = currentThread().getName();
    System.out.println(tName + " offers:" + l);
}

public long printSum(long i, long j) {
    long sum = i + j;
    String tName = currentThread().getName();
    System.out.printf(
        "%s has: %d; plus: %d; result: %d\n", 
        tName, i, j, sum
    );
    
    return sum;
}

Estos dos métodos llaman a los métodos printThreadName() y printSum(). En sumUsingSequential() podemos representar los pasos usando este diagrama de actividad:

secuencias paralelas en ilustración de operaciones java

Observe cómo el flujo de control usa solo un hilo. El hilo principal. Y cuando ejecuta el fragmento, obtiene estos resultados:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
main offers:1
main has: 0; plus: 1; result: 1
main offers:2
main has: 1; plus: 2; result: 3
main offers:3
main has: 3; plus: 3; result: 6
main offers:4
main has: 6; plus: 4; result: 10
main offers:5
main has: 10; plus: 5; result: 15
main offers:6
main has: 15; plus: 6; result: 21
main offers:7
main has: 21; plus: 7; result: 28
main offers:8
main has: 28; plus: 8; result: 36
main offers:9
main has: 36; plus: 9; result: 45
main offers:10
main has: 45; plus: 10; result: 55
Sum parallel: 55

El flujo de cálculo es como cabría esperar de un patrón imperativo. Por ejemplo, una implementación de bucle for. Sin embargo, se vuelve más interesante cuando ejecutamos sumUsingParallel():

1
2
3
4
5
6
public long sumUsingParallel() {
    return LongStream.rangeClosed(1L, 10L)
        .parallel()
        .peek(this::printThreadName)
        .reduce(0L, this::printSum);
}

La simple inclusión de parallel() ha provocado que la transmisión use todos los núcleos de CPU disponibles. Y en este caso, ejecutamos el código en una computadora con CPU de cuatro núcleos. Y como puede ver en este diagrama de actividad, el enfoque de unión de bifurcación está en uso:

parallel streams in java illustration

La llamada parallel() activa el mecanismo de combinación de bifurcación en el flujo de números. Divide el flujo para que se ejecute en cuatro subprocesos. Una vez que cada subproceso tiene un flujo, el mecanismo llama a reduce() en cada uno para ejecutarse en concurrencia.

Como:

1
stream > (stream1, stream2)

Dónde:

1
2
stream1 > (stream1.1, stream1.2)
stream2 > (stream2.1, stream2.2)

Luego, los resultados de cada reduce() se agregan a resultados intermedios: r5 y r6:

1
2
r5 = r1 + r2
r6 = r3 + r4

Donde r1, r2, r3 y r4 son los resultados de cada operación serial reduce(). El resultado final, r7, es una suma de los resultados intermedios; r5 y r6. Este resumen de resultados intermedios ocurre en la fase unión de la bifurcación-unión.

Estas operaciones también son evidentes en la salida de la consola del método:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
worker-1 offers:3
main offers:7
worker-1 has: 0; plus: 3; result: 3
worker-2 offers:9
worker-1 offers:5
worker-1 has: 0; plus: 5; result: 5
worker-3 offers:2
worker-1 offers:4
worker-2 has: 0; plus: 9; result: 9
worker-2 offers:10
worker-2 has: 0; plus: 10; result: 10
main has: 0; plus: 7; result: 7
worker-2 has: 9; plus: 10; result: 19
worker-1 has: 0; plus: 4; result: 4
worker-3 has: 0; plus: 2; result: 2
worker-1 has: 4; plus: 5; result: 9
worker-2 offers:8
worker-2 has: 0; plus: 8; result: 8
main offers:6
worker-2 has: 8; plus: 19; result: 27
worker-1 has: 3; plus: 9; result: 12
worker-3 offers:1
worker-3 has: 0; plus: 1; result: 1
main has: 0; plus: 6; result: 6
main has: 6; plus: 7; result: 13
main has: 13; plus: 27; result: 40
worker-3 has: 1; plus: 2; result: 3
worker-3 has: 3; plus: 12; result: 15
worker-3 has: 15; plus: 40; result: 55
Sum parallel: 55

Es vital tener en cuenta que los hilos hicieron sus cálculos sin un orden discernible. Y, como veremos más adelante, esta característica es un punto de preocupación donde los resultados de reduce() no tienen asociatividad.

¿Los flujos paralelos también son concurrentes? {#areparallelstreamsconcurrent también}

"La concurrencia se trata de manejar muchas cosas a la vez"

--Rob Pike

En resumen, sí. Los subprocesos que se ejecutan en paralelo se ejecutan en un orden concurrente. Y es por eso que antes dijimos que:

Una vez que cada subproceso tiene un flujo, el mecanismo llama a reduce() en cada uno para ejecutarse en concurrencia.

Pero la distinción importante es - no es obligatorio que los subprocesos que se ejecutan concurrentemente se ejecuten en paralelo. Como ilustración, tenemos el método startThreads():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public void startThreads() {
    StringBuffer sb = new StringBuffer("world");

    Thread t1 = new Thread(() -> {
        String tName = currentThread().getName();
        System.out.printf(
            "before running %s: %s\n", 
            tName, sb
        );

        if (sb.length() > 0) {
            int idx = sb.length() - 1;
            char c = sb.charAt(idx);
            sb.deleteCharAt(idx);

            System.out.printf(
                "on running: %s; remove %s\n", 
                tName, c
            );
        }
        System.out.printf(
            "after running %s: %s\n", 
            tName, sb
        );
    }, "thread-1");
    
    Thread t2 = new Thread(() -> {
        String tName = currentThread().getName();
        System.out.printf(
            "before running %s: %s\n", 
            tName, sb
        );

        if (sb.length() > 0) {
            int idx = sb.length() - 1;
            char c = sb.charAt(idx);
            sb.deleteCharAt(idx);

            System.out.printf(
                "on running: %s; remove %s\n", 
                tName, c
            );
        }
        System.out.printf(
            "after running %s: %s\n", 
            tName, sb
        );
    }, "thread-2");

    t1.start();
    t2.start();

    try {
        Thread.sleep(10000);
    } catch (InterruptedException ex) {
        // Handle exception
    }

    System.out.printf("after all runs: %s\n", sb);
}

El método crea dos subprocesos: t1 y t2. Ambos intentan eliminar los caracteres al final de un StringBuffer - sb. Luego, el método inicia los dos.

Al ejecutar el código obtienes el resultado:

1
2
3
4
5
6
7
before running thread-1: hello world
before running thread-2: hello worl
on running: thread-2; remove l
after running thread-2: hello wor
on running: thread-1; remove d
after running thread-1: hello wor
after all runs: hello wor

Sin embargo, cuando lo ejecuta por segunda vez, puede obtener:

1
2
3
4
5
6
7
before running thread-1: hello world
on running: thread-1; remove d
after running thread-1: hello worl
before running thread-2: hello worl
on running: thread-2; remove l
after running thread-2: hello wor
after all runs: hello wor

Estos resultados muestran que los hilos están cambiando el contenido de sb de forma sincronizada. Sin embargo, no puedes predecir sus ejecuciones simultáneas. Esto depende del SDK asignación del planificador.

Prácticas recomendadas con flujos paralelos {#mejores prácticas con flujos paralelos}

Dicho esto, resumamos las mejores prácticas:

  • Asociatividad: espera que los resultados lleguen sin seguir ningún orden
  • Las expresiones lambda deben ser sin estado
  • Evitar la modificación de los elementos de los streams
  • Las expresiones lambda no deberían emitir efectos secundarios.
  • Utilice el paralelismo únicamente cuando el número de elementos sea muy grande. Por ejemplo, con un flujo de elementos int que son menos de 10,000, prefiera la ejecución en serie a la ejecución en paralelo.

Conclusión

La función parallel() de Streams API ha simplificado la forma en que podemos hacer que el código maneje muchas tareas al mismo tiempo. Al dividir las tareas en subtareas, nos ayuda a ejecutar ejecuciones más rápido que antes.

Sin embargo, la operación parallel() requiere primero una mirada cuidadosa al diseño del código. En la mayoría de los casos de uso, los flujos no contienen tantos elementos como para justificar el paralelismo. Incluso cuando lo hacen, las operaciones finales de agregación deben respetar la asociatividad.

El orden en que ocurren las ejecuciones no debe tener un efecto en el resultado final. Si es así, entonces el paralelismo habría sido una elección de diseño incorrecta para su código.

Por otra parte, con un diseño cuidadoso, parallel() mejorará el rendimiento del código. Y lo hará sin sacrificar la legibilidad de su código.

Puede encontrar el código completo que esta guía usó en este repositorio GitHub.

Siéntase libre de clonarlo y cambiarlo para obtener una visión más profunda de cómo funciona parallel().

Licensed under CC BY-NC-SA 4.0