Guía de recopiladores de Java 8: groupingByConcurrent()

En esta guía, aprenda a usar el recopilador Collectors.groupingByConcurrent() en Java 8 para recopilar y agrupar elementos de flujos simultáneamente, con recopiladores y proveedores posteriores, a través de ejemplos.

Introducción

Una secuencia representa una secuencia de elementos y admite diferentes tipos de operaciones que conducen al resultado deseado. La fuente de un flujo suele ser una Colección o un Array, desde el cual se transmiten los datos.

Los flujos se diferencian de las colecciones en varios aspectos; sobre todo porque los flujos no son una estructura de datos que almacena elementos. Son de naturaleza funcional, y vale la pena señalar que las operaciones en un flujo producen un resultado y, por lo general, devuelven otro flujo, pero no modifican su origen.

Para "solidificar" los cambios, reúne los elementos de un flujo en una Colección.

¡En esta guía, veremos cómo agrupar datos Stream en Java con Collectors.groupingBy()!

Colectores y paralelismo

Los recopiladores representan implementaciones de la interfaz Collector, que implementa varias operaciones de reducción útiles, como acumular elementos en colecciones, resumir elementos en función de un parámetro específico, etc.

Todas las implementaciones predefinidas se pueden encontrar dentro de la clase Collectors.

Sin embargo, también puede implementar muy fácilmente su propio recopilador y usarlo en lugar de los predefinidos; puede llegar bastante lejos con los recopiladores integrados, ya que cubren la gran mayoría de los casos en los que es posible que desee usarlos.

Para poder usar la clase en nuestro código necesitamos importarla:

1
import static java.util.stream.Collectors.*;

Stream.collect() realiza una operación de reducción mutable en los elementos de la secuencia.

{.icon aria-hidden=“true”}

Una operación de reducción mutable recopila elementos de entrada en un contenedor mutable, como una Colección, mientras procesa los elementos de la secuencia.

La computación paralela (paralelismo) se refiere al proceso de dividir un problema en dos o más subproblemas, resolviendo esos problemas simultáneamente, en paralelo, con cada subproblema siendo computado en un subproceso separado, y luego combinando todas las soluciones a los subproblemas en un único problema. resultado.

Uno de los mayores desafíos de implementar el paralelismo en programas que usan colecciones es que las colecciones no son seguras para subprocesos, lo que significa que varios subprocesos no pueden manipular una colección sin introducir interferencias de subprocesos o errores de coherencia de memoria . Lo que también debemos tener en cuenta es que el paralelismo no es necesariamente más rápido que la ejecución en serie, aunque esto depende en gran medida de la cantidad de datos y la cantidad de núcleos de la CPU.

Volviendo al contexto, las transmisiones se pueden ejecutar en serie o en paralelo. Cuando una secuencia se ejecuta en paralelo, el tiempo de ejecución de Java divide la secuencia en varias subsecuencias. Las operaciones se ejecutan en subflujos independientes en paralelo y luego se combinan en un resultado final.

Al crear una transmisión, siempre es una transmisión en serie a menos que se indique lo contrario, que es específicamente paralela. Para crear un flujo paralelo, invocamos Collection.parallelStream(), que es un método que se encuentra dentro de la interfaz Collection.

{.icon aria-hidden=“true”}

Nota: Si bien el uso de este método le permite implementar el paralelismo más fácilmente, sigue siendo su responsabilidad determinar si su aplicación es adecuada para el paralelismo, en función de su conocimiento del hardware en el que está ejecutando su código. .

Collectors.groupingByConcurrent()

Collectors.groupingByConcurrent() usa una arquitectura multinúcleo y es muy similar a Collectors.groupingBy(), ya que también se comporta como la instrucción "GROUP BY" en SQL.

Agrupa objetos por una propiedad específica dada y almacena el resultado final en un ConcurrentMap.

Si desea leer más sobre groupingBy(), lea nuestra Guía de recopiladores de Java 8: groupingBy()!

Definamos una clase simple para usar a lo largo de los ejemplos. Será una representación de un libro, con algunos campos:

1
2
3
4
5
6
7
public class Book {
    private String title;
    private String author;
    private int releaseYear;
    
    // Constructor, getters, setters, toString()
}

Con el modelo en su lugar, instanciamos una lista de algunos libros con los que trabajaremos:

1
2
3
4
5
6
7
8
List<Book> books = Arrays.asList(
    new Book("The Lord of the Rings", "J.R.R. Tolkien", 1954),
    new Book("The Hobbit", "J.R.R. Tolkien", 1937),
    new Book("Animal Farm", "George Orwell", 1945),
    new Book("Nineteen Eighty-Four", "George Orwell", 1949),
    new Book("The Road to Wigan Pier", "George Orwell", 1937),
    new Book("Lord of the Flies", "William Golding", 1954)
);

groupingByConcurrent() tiene tres sobrecargas dentro de la clase Collectors. Repasaremos cada uno de ellos y explicaremos las diferencias en la implementación a través de ejemplos a lo largo del camino.

Comencemos con el más simple de ellos.

Collectors.groupingByConcurrent() con una función clasificadora

La primera sobrecarga de este método solo toma un argumento: la función clasificadora:

1
2
public static <T,K> Collector<T,?,ConcurrentMap<K,List<T>>> 
    groupingByConcurrent(Function<? super T,? extends K> classifier)

Este método devuelve un Collector que agrupa los elementos de entrada de tipo T según la función de clasificación. La función de clasificación asigna elementos a una clave de tipo K. El recopilador en sí produce un ConcurrentMap<K, List<T>> cuyas claves representan los valores que obtenemos al aplicar la función de clasificación en la entrada, y cuyos valores correspondientes son Listas que contienen los elementos de entrada que se asignan a la clave asociada .

Este Collector es tanto concurrente como desordenado. Al estar desordenada, la operación de recopilación no conserva el orden de la entrada por su encuentro. Al ser concurrente, el contenedor de resultados admite funciones que se llaman simultáneamente con el mismo contenedor de resultados desde varios subprocesos.

{.icon aria-hidden=“true”}

Esta propiedad no es exclusiva de esta sobrecarga específica del método groupingByConcurrent(), pero también se aplica a las otras dos sobrecargas.

Avancemos y agrupemos los libros por autor:

1
2
ConcurrentMap<String, List<Book>> booksByAuthor = books.parallelStream()
             .collect(Collectors.groupingByConcurrent(Book::getAuthor));

Los elementos recopilados estarán desordenados, pero agrupados. Ejecutar el mismo código dará como resultado diferentes tipos de elementos dentro de los grupos, pero el tipo de los grupos en sí será consistente:

1
2
3
4
5
{
J.R.R. Tolkien=[Book{author='J.R.R. Tolkien', title='The Hobbit', releaseYear=1937}, Book{author='J.R.R. Tolkien', title='The Lord of the Rings', releaseYear=1954}], 
William Golding=[Book{author='William Golding', title='Lord of the Flies', releaseYear=1954}], 
George Orwell=[Book{author='George Orwell', title='Animal Farm', releaseYear=1945}, Book{author='George Orwell', title='The Road to Wigan Pier', releaseYear=1937}, Book{author='George Orwell', title='Nineteen Eighty-Four', releaseYear=1949}]
}

Dependiendo de cómo se desempeñen los subprocesos en la CPU y de cuáles terminen su cálculo primero, el Hobbit podría aparecer después del Señor de los Anillos y viceversa.

Evaluación comparativa de recopiladores secuenciales y concurrentes.agrupación por () {#evaluación comparativa de recopiladores secuenciales y concurrentesagrupación por}

Si bien la diferencia entre groupingBy() normal y groupingByConcurrent() puede no ser obvia desde lejos, el principio subyacente de la agrupación es significativamente diferente.

Cuando se trata de grandes cantidades de libros, con un procesador decente, este enfoque puede mejorar significativamente el rendimiento.

Generemos un montón de libros e intentemos agruparlos secuencialmente y en paralelo ...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
List<Book> books = new ArrayList<>();
List<String> authorList = Arrays.asList(
            "George Orwell",
            "Nick Bostrom",
);

for (int i = 0; i < 100000; i++) {
    books.add(new Book(
            String.valueOf(i),
            authorList.get(new Random().ints(1, 1, authorList.size()).findFirst().getAsInt()),
            1900));
}

long startTimeSequential = System.currentTimeMillis();
Map<String, List<Book>> booksByAuthorSequential = books.stream()
                .collect(Collectors.groupingBy(Book::getAuthor));

long endTimeSequential = System.currentTimeMillis();
System.out.printf("Total time for sequential process: %sms\n",  (endTimeSequential-startTimeSequential));

long startTimeParallel = System.currentTimeMillis();
 ConcurrentMap<String, List<Book>> booksByAuthorParallel = books.parallelStream()
                .collect(Collectors.groupingByConcurrent(Book::getAuthor));
long endTimeParallel = System.currentTimeMillis();
System.out.printf("Total time for parallel process: %sms\n",  (endTimeParallel-startTimeParallel));

Dependiendo de su sistema y CPU, el proceso secuencial puede tomar más o menos tiempo que la contraparte paralela. Esto también depende en gran medida del número de grupos. Si tiene algunos grupos (menos autores), el proceso de dividirlos y agregar los resultados podría compensar el enfoque paralelo lo suficiente como para hacerlo más lento que el enfoque secuencial.

{.icon aria-hidden=“true”}

Nota: Cuantos menos grupos esté tratando, más probable es que el enfoque secuencial supere al paralelo, pero esto también depende en gran medida de la CPU de la máquina en la que está ejecutando el código. .

Con solo dos autores, ejecutar este fragmento de código da como resultado:

1
2
Total time for sequential process: 12ms
Total time for parallel process: 26ms

Si bien ambos procesos tardaron muy poco tiempo en ejecutarse, teniendo en cuenta la creación y agrupación de 100 000 objetos, el proceso paralelo llevó mucho más tiempo.

Si tuviéramos que ampliar nuestra lista con algunos autores más:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
List <String> authorList = Arrays.asList(
                "George Orwell",
                "Nick Bostrom",
                "Ray Kurzweil",
                "J.R.R. Tolkien",
                "Eliezer Yudkowsky",
                "Stuart Russel",
                "Max Tegmark",
                "Anil Seth",
                "Thomas Metzinger",
                "Aurélien Geron",
                "Max Lapan",
                "Brian Greene",
                "Frank Wilczek"
        );

Los resultados serían bastante similares:

1
2
Total time for sequential process: 13ms
Total time for parallel process: 19ms

Sin embargo, si lo expandimos significativamente:

1
2
3
for (int i = 0; i < 10000; i++) {
    authorList.add("Author" + i);
}

¿Adivinas qué pasa ahora, con 10 mil autores? En realidad, lo mismo:

1
2
Total time for sequential process: 19ms
Total time for parallel process: 33ms

Pero, si ejecuta este código en otra máquina que pueda utilizar subprocesos de manera más eficiente, será recibido con:

1
2
Total time for sequential process: 97ms
Total time for parallel process: 52ms

{.icon aria-hidden=“true”}

Nota: La simultaneidad no es una panacea que siempre simplemente funciona y hace que el código se ejecute más rápido.

Collectors.groupingByConcurrent() con función de clasificación y recopilador descendente

La segunda variación del método toma dos argumentos: una función de clasificación y un colector adicional aguas abajo:

1
2
3
public static <T,K,A,D> Collector<T,?,ConcurrentMap<K,D>>
    groupingByConcurrent(Function<? super T,? extends K> classifier,
                         Collector<? super T,A,D> downstream)

Este método devuelve un Collector que agrupa los elementos de entrada de tipo T de acuerdo con la función de clasificación, luego aplica una operación de reducción en los valores asociados con una clave dada usando el Collector aguas abajo especificado.

La operación de reducción "reduce" los datos que hemos recopilado aplicando una operación que es útil en una situación específica.

If you'd like to read more about reduction in Java in great detail - read our Java 8 Streams: Guía para reducir()!

Veamos un ejemplo de esta variante del método. Como downstream aquí, usaremos mapping(), que toma 2 parámetros:

  • Un mapeador - una función que se aplicará a los elementos de entrada y
  • Un colector descendente – un colector que aceptará valores mapeados

Collectors.mapping() en sí mismo hace un trabajo bastante sencillo. Adapta un colector que acepta elementos de un tipo para aceptar un tipo diferente aplicando una función de mapeo a cada elemento de entrada antes de la acumulación. En nuestro caso, asignaremos cada Estudiante a su nombre y devolveremos esos nombres como una lista.

Aquí volveremos a agrupar nuestros libros por autor, pero en lugar de usar ConcurrentMap<String, List<Book> usaremos ConcurrentMap<String, List<String> y reduciremos nuestros libros a un simple cuerda:

1
2
ConcurrentMap<String, List<String>> booksByAuthor = books.parallelStream()
    .collect(Collectors.groupingByConcurrent(Book::getAuthor, Collectors.mapping(Book::getTitle, Collectors.toList())));

Estas son reducciones de libros, donde los hemos reducido a un título, aunque también podría sustituir esto con cualquier otra operación de reducción:

1
2
3
4
5
{
J.R.R. Tolkien=[The Lord of the Rings, The Hobbit], 
William Golding=[Lord of the Flies], 
George Orwell=[Nineteen Eighty-Four, The Road to Wigan Pier, Animal Farm]
}

Otra aplicación muy útil de esta sobrecarga es que nuestra función descendente puede ser, bueno, otro Collectors.groupingByConcurrent(). Por lo tanto, puede encadenar cualquier número de grupos, creando grupos anidados.

Agrupemos los libros por su año de lanzamiento, pero dentro de esos grupos, agruparemos los libros por autores:

1
2
3
ConcurrentMap<Integer, ConcurrentMap<String, List<String>>> booksByAuthor = books.parallelStream()
                .collect(Collectors.groupingByConcurrent(Book::getReleaseYear,
                        Collectors.groupingByConcurrent(Book::getAuthor, Collectors.mapping(Book::getTitle, Collectors.toList()))));

Y obtener el siguiente resultado:

1
2
3
4
5
6
{
1937={J.R.R. Tolkien=[The Hobbit], George Orwell=[The Road to Wigan Pier]}, 
1954={J.R.R. Tolkien=[The Lord of the Rings], William Golding=[Lord of the Flies]}, 
1945={George Orwell=[Animal Farm]}, 
1949={George Orwell=[Nineteen Eighty-Four]}
}

Collectors.groupingBy() with Classifier Function, Downstream Collector and Provider

La tercera y última sobrecarga de este método toma tres argumentos. El primero y el tercero son los mismos que en la sobrecarga anterior, pero el segundo argumento es un método proveedor.

El método proveedor proporciona la implementación específica de ConcurrentMap que queremos usar para contener nuestro resultado final. Tenemos dos clases conocidas que implementan esta interfaz: ConcurrentHashMap y ConcurrentSkipListMap:

1
2
3
4
public static <T,K,A,D,M extends ConcurrentMap<K,D>> Collector<T,?,M> 
    groupingByConcurrent(Function<? super T,? extends K> classifier,
                         Supplier<M> mapFactory,
                         Collector<? super T,A,D> downstream)

El valor de retorno de este método es el mismo que el de la sobrecarga anterior. La única diferencia con este es que el ConcurrentMap producido por el colector se crea con la función de fábrica suministrada.

Haremos solo un ejemplo simple para esta sobrecarga, ya que todo es más o menos igual que el anterior con la excepción de la implementación ConcurrentMap especificada:

1
2
3
4
ConcurrentMap<String, List<String>> booksByAuthor = books.parallelStream()
    .collect(Collectors.groupingByConcurrent(Book::getAuthor,
                                             ConcurrentHashMap::new,
                                             Collectors.mapping(Book::getTitle, Collectors.toList())));

Conclusión

La clase Collectors es poderosa y nos permite recopilar flujos en colecciones de varias maneras.

Puede definir sus propios recopiladores, pero los recopiladores integrados pueden llevarlo muy lejos ya que son genéricos y se pueden generalizar a la gran mayoría de las tareas que se le ocurran.

En esta guía, hemos visto algunos ejemplos del uso del método Collectors.groupingByConcurrent(), que agrupa elementos dados parámetros específicos y devuelve un ConcurrentMap.

Al utilizar este método en lugar de Collectors.groupingBy() no concurrente, podemos utilizar completamente la arquitectura multinúcleo, si el hardware subyacente nos lo permite. Sin embargo, si bien el uso de este método le permite implementar el paralelismo más fácilmente, sigue siendo su responsabilidad determinar si su aplicación es adecuada para el paralelismo.

Ha aprendido a utilizar el formulario básico, así como formularios con recopiladores y proveedores posteriores para simplificar el código y ejecutar operaciones funcionales potentes pero sencillas en los flujos.