Concurrencia en Java: El Marco Ejecutor

Con el aumento en la cantidad de núcleos disponibles en los procesadores en la actualidad, junto con la necesidad cada vez mayor de lograr un mayor rendimiento, multi-thre ...

Introducción

Con el aumento en la cantidad de núcleos disponibles en los procesadores en la actualidad, junto con la necesidad cada vez mayor de lograr un mayor rendimiento, las API de subprocesos múltiples se están volviendo bastante populares. Java proporciona su propio marco de subprocesos múltiples llamado [Marco ejecutor] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html).

¿Qué es el Marco Ejecutor?

Executor Framework contiene un montón de componentes que se utilizan para administrar de manera eficiente los subprocesos de trabajo. La API Executor desacopla la ejecución de la tarea de la tarea real para ser ejecutada a través de Executors. Este diseño es una de las implementaciones del patrón Productor-Consumidor.

Los java.util.concurrent.Executors proporcionan métodos de fábrica que se utilizan para crear ThreadPools de subprocesos de trabajo.

Para usar Executor Framework, necesitamos crear uno de esos grupos de subprocesos y enviarle la tarea para su ejecución. Es el trabajo de Executor Framework programar y ejecutar las tareas enviadas y devolver los resultados del grupo de subprocesos.

Una pregunta básica que viene a la mente es ¿por qué necesitamos tales grupos de subprocesos cuando podemos crear objetos de java.lang.Thread o implementar interfaces Runnable/Callable para lograr el paralelismo?

La respuesta se reduce a dos hechos básicos:

  1. La creación de un nuevo hilo para una nueva tarea genera una sobrecarga de creación y eliminación de hilos. La gestión de este ciclo de vida del subproceso aumenta significativamente el tiempo de ejecución.
  2. La adición de un nuevo subproceso para cada proceso sin ninguna limitación conduce a la creación de una gran cantidad de subprocesos. Estos subprocesos ocupan memoria y provocan el desperdicio de recursos. La CPU comienza a pasar demasiado tiempo cambiando de contexto cuando se intercambia cada subproceso y entra otro subproceso para su ejecución.

Todos estos factores reducen el rendimiento del sistema. Los grupos de subprocesos solucionan este problema manteniendo los subprocesos activos y reutilizándolos. Cualquier exceso de tareas que entren y que los subprocesos del grupo puedan manejar se mantienen en una ‘Cola’. Una vez que cualquiera de los subprocesos se libera, seleccionan la siguiente tarea de esta cola. Esta cola de tareas es esencialmente ilimitada para los ejecutores listos para usar proporcionados por el JDK.

Tipos de ejecutores

Ahora que tenemos una buena idea de lo que es un ejecutor, echemos un vistazo a los diferentes tipos de ejecutores.

Ejecutor de subproceso único

Este ejecutor de grupo de subprocesos tiene un único subproceso. Se utiliza para ejecutar tareas de forma secuencial. Si el subproceso muere debido a una excepción mientras se ejecuta una tarea, se crea un nuevo subproceso para reemplazar el subproceso anterior y las tareas posteriores se ejecutan en el nuevo.

1
ExecutorService executorService = Executors.newSingleThreadExecutor()

Grupo de subprocesos fijos (n)

Como su nombre lo indica, es un grupo de subprocesos de un número fijo de subprocesos. Las tareas enviadas al ejecutor son ejecutadas por los subprocesos n y, si hay más tareas, se almacenan en LinkedBlockingQueue. Este número suele ser el número total de subprocesos admitidos por el procesador subyacente.

1
ExecutorService executorService = Executors.newFixedThreadPool(4);

Grupo de subprocesos en caché

Este grupo de subprocesos se usa principalmente donde hay muchas tareas paralelas de corta duración para ejecutar. A diferencia del grupo de subprocesos fijos, el número de subprocesos de este grupo de ejecutores no está limitado. Si todos los subprocesos están ocupados ejecutando algunas tareas y llega una nueva tarea, el grupo creará y agregará un nuevo subproceso al ejecutor. Tan pronto como uno de los hilos quede libre, se encargará de la ejecución de las nuevas tareas. Si un subproceso permanece inactivo durante sesenta segundos, se finaliza y se elimina de la memoria caché.

Sin embargo, si no se administra correctamente, o si las tareas no son de corta duración, el grupo de subprocesos tendrá muchos subprocesos activos. Esto puede conducir a una paliza de recursos y, por lo tanto, a una caída del rendimiento.

1
ExecutorService executorService = Executors.newCachedThreadPool();

Ejecutor programado

Este ejecutor se utiliza cuando tenemos una tarea que debe ejecutarse a intervalos regulares o si deseamos retrasar una determinada tarea.

1
ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(1);

Las tareas se pueden programar en ScheduledExecutor usando cualquiera de los dos métodos scheduleAtFixedRate o scheduleWithFixedDelay.

1
scheduledExecService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
1
scheduledExecService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)

La principal diferencia entre los dos métodos es su interpretación del retraso entre ejecuciones consecutivas de un trabajo programado.

scheduleAtFixedRate ejecuta la tarea con un intervalo fijo, independientemente de cuándo finalizó la tarea anterior.

scheduleWithFixedDelay iniciará la cuenta regresiva del retraso solo después de que se complete la tarea actual.

Comprender el objeto futuro

Se puede acceder al resultado de la tarea enviada para su ejecución a un ejecutor mediante el objeto java.util.concurrent.Future devuelto por el ejecutor. Se puede pensar en el futuro como una promesa que el ejecutor le hace a la persona que llama.

1
Future<String> result = executorService.submit(callableTask);

Una tarea enviada al ejecutor, como la anterior, es asíncrona, es decir, la ejecución del programa no espera a que finalice la ejecución de la tarea para pasar al siguiente paso. En cambio, cada vez que se completa la ejecución de la tarea, el ejecutor la establece en este objeto ‘Futuro’.

La persona que llama puede continuar ejecutando el programa principal y cuando se necesita el resultado de la tarea enviada, puede llamar a .get() en este objeto Futuro. Si la tarea se completa, el resultado se devuelve inmediatamente a la persona que llama o, de lo contrario, la persona que llama se bloquea hasta que el ejecutor complete la ejecución y se calcule el resultado.

Si la persona que llama no puede permitirse el lujo de esperar indefinidamente antes de recuperar el resultado, esta espera también se puede programar. Esto se logra mediante el método Future.get(long timeout, TimeUnit unit) que arroja una TimeoutException si el resultado no se devuelve en el plazo estipulado. La persona que llama puede manejar esta excepción y continuar con la ejecución posterior del programa.

Si hay una excepción al ejecutar la tarea, la llamada al método get arrojará una ExecutionException.

Una cosa importante con respecto al resultado que devuelve el método Future.get() es que solo se devuelve si la tarea enviada implementa java.util.concurrent.Callable. Si la tarea implementa la interfaz Runnable, la llamada a .get() devolverá null una vez que se complete la tarea.

Otro método importante es el método Future.cancel(boolean mayInterruptIfRunning). Este método se utiliza para cancelar la ejecución de una tarea enviada. Si la tarea ya se está ejecutando, el ejecutor intentará interrumpir la ejecución de la tarea si el indicador mayInterruptIfRunning se pasa como true.

Ejemplo: crear y ejecutar un ejecutor simple

Ahora crearemos una tarea e intentaremos ejecutarla en un ejecutor de grupo fijo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class Task implements Callable<String> {

    private String message;

    public Task(String message) {
        this.message = message;
    }

    @Override
    public String call() throws Exception {
        return "Hello " + message + "!";
    }
}

La clase Task implementa Callable y está parametrizada al tipo String. También se declara para lanzar Exception. Esta capacidad de lanzar una excepción al ejecutor y que el ejecutor devuelva esta excepción a la persona que llama es de gran importancia porque ayuda a la persona que llama a conocer el estado de ejecución de la tarea.

Ahora vamos a ejecutar esta tarea:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class ExecutorExample {
    public static void main(String[] args) {

        Task task = new Task("World");

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        Future<String> result = executorService.submit(task);

        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Error occured while executing the submitted task");
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}

Aquí hemos creado un ejecutor FixedThreadPool con un conteo de 4 hilos ya que esta demostración está desarrollada en un procesador de cuatro núcleos. El recuento de subprocesos puede ser mayor que los núcleos del procesador si las tareas que se ejecutan realizan operaciones de E/S considerables o pasan tiempo esperando recursos externos.

Hemos instanciado la clase ‘Tarea’ y la estamos pasando al ejecutor para su ejecución. El resultado es devuelto por el objeto Futuro, que luego imprimimos en la pantalla.

Ejecutemos ExecutorExample y verifiquemos su salida:

1
Hello World!

Como era de esperar, la tarea agrega el saludo "Hola" y devuelve el resultado a través del objeto Futuro.

Por último, llamamos al apagado en el objeto executorService para terminar todos los subprocesos y devolver los recursos al sistema operativo.

El método .shutdown() espera la finalización de las tareas actualmente enviadas al ejecutor. Sin embargo, si el requisito es apagar inmediatamente el ejecutor sin esperar, entonces podemos usar el método .shutdownNow() en su lugar.

Todas las tareas pendientes de ejecución se devolverán en un objeto java.util.List.

También podemos crear esta misma tarea implementando la interfaz Runnable:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class Task implements Runnable{

    private String message;

    public Task(String message) {
        this.message = message;
    }

    public void run() {
        System.out.println("Hello " + message + "!");
    }
}

Hay un par de cambios importantes aquí cuando implementamos ejecutable.

  1. El resultado de la ejecución de la tarea no se puede devolver desde el método run(). Por lo tanto, estamos imprimiendo directamente desde aquí.
  2. El método run() no está configurado para generar ninguna excepción comprobada.

Conclusión

Los subprocesos múltiples se están generalizando cada vez más, ya que la velocidad del reloj del procesador es difícil de aumentar. Sin embargo, manejar el ciclo de vida de cada subproceso es muy difícil debido a la complejidad involucrada.

En este artículo, demostramos un marco de subprocesos múltiples eficiente pero simple, el Marco Ejecutor, y explicamos sus diferentes componentes. También echamos un vistazo a diferentes ejemplos de creación de tareas de envío y ejecución en un ejecutor.

Como siempre, el código de este ejemplo se puede encontrar en GitHub.