De hilos en Java

Aunque me gustan muchos lenguajes de programación, he usado Java como lenguaje principal desde que llegué a Japón. En realidad no hay una razón específica para ello, simplemente sentía que necesitaba practicar más en él, puesto que había estado usando mayormente C, y sentía que Java se me estaba olvidando… y aquí me tienen 7 años y medio después usándolo todavía. De vez en cuando también sale algo en Python, y Haskell es primordialmente para XMonad, pero de C lo último que hice fue un minijuego de PSP hace 4 años.

Después de usar un lenguaje de programación por un rato, se vuelve fácil sentarse a desarrollar algo sin la preocupación de estar leyendo cómo implementar algo en él… Sin embargo, siempre se aprende algo nuevo, y cada día lo reafirmo con cada aplicación que desarrollo.

He programado con hilos en Java desde hace mucho, pero nunca me había puesto a leer con detalle las clases del paquete java.util.concurrent. El control de hilos (el principal esperando mientras que otros realizan tareas de forma concurrente) lo implementaba con wait(), notifyAll() y boolean. Primitivo, pero funcionaba. Cuando necesito esperar un sólo hilo uso join(). Pero estaba seguro que debería haber algo ya en las clases de Java que hiciera lo que yo hacía, algo así como cuando aprendes lo que son las listas ligadas, implementas una pila con todos sus métodos y después  te dicen que ya hay una clase Stack que hace todo eso, y muchas veces de forma más eficiente (no siempre, aclaro). Fue entonces cuando descubrí las clases CountDownLatch y ThreadPoolExecutor.

CountDownLatch es una clase que simplemente inicializa un contador a un valor igual al número de hilos que se van a ejecutar de forma concurrente; cada hilo deberá invocar el método countDown() de CountDownLatch al terminar su ejecución. El hilo principal, una vez que manda ejecutar todos los hilos, espera a que estos terminen; para lograrlo, se invoca el método await(). Cuando el contador llega a cero, el hilo principal sale del await() y continúa su ejecución de forma normal:


public final class Master {

  public static void main(String a[]) {
      int THREAD_NUM = 10;
      CountDownLatch counter = new CountDownLatch(THREAD_NUM);

      for (int i = 0; i < THREAD_NUM; i++) {
         new Thread(new Worker(new EvilTask(), counter)).start();
      }

     // await() puede ser interrumpido, por lo que hay que ponerlo entre try/catch

     try {
        counter.await();
     } catch (InterruptedException e) {
        System.out.println("Counter was interrupted!");
    }
 }

}

class Worker implements Runnable {
  CountDownLatch counter = null;
  EvilTask task = null;

   Worker(EvilTask task, CountDownLatch counter) {
      this.task = task;
      this.counter = counter;
   }

   public void run() {
      try {
       tryToTakeOverTheWorld(task);

        // Pase lo que pase, cada hilo debe hacer countDown(), si no, el programa entra en deadlock.
     } finally {
       counter.countDown();
     }

   }
}

ThreadPoolExecutor es, como su nombre lo indica, un pool de hilos, tal y como los pool de conexiones a bases de datos: en vez de estar creando un hilo para cada nueva tarea (y con ello generar problemas de memoria cuando son muchos), se crean de antemano cierto número de hilos que serán ejecutados al mismo tiempo, y cuando alguno termina su ejecución se asigna a otro proceso que esperaba su turno. Algo así como un despachador.

Sé que ThreadPoolExecutor no es el único Executor que Java tiene (el ScheduledThreadPoolExecutor también es muy útil), pero para el ejemplo servirá.

Un ThreadPoolExecutor tiene un número determinado de hilos que se ejecutarán al mismo tiempo, un número máximo de hilos, un timer para poder eliminar un hilo que ha estado idle por determinado tiempo y una cola de espera para los hilos que tengan que esperar su turno.

La cola de espera puede ser fija, dinámica o de transferencia, y de esto depende el comportamiento que tendrá el Executor:

  • Fija: si se están ejecutando tantos hilos como el número determinado, la tarea se pone en la cola de espera: Si la cola está llena y llega un nuevo hilo a ella, revisa el número máximo de hilos. Si el número de hilos no ha rebasado el máximo, la ejecuta en uno nuevo. En caso contrario, la tarea es rechazada (y la forma de rechazarla también puede ser controlada, pero eso lo pueden ver directamente en la API).
  • Dinámica: si se están ejecutando tantos hilos como el número determinado, la tarea siempre se pone en la cola de espera, por lo que en este caso el número máximo de hilos no importa porque nunca se revisa. SIN EMBARGO, al momento de crear el ThreadPoolExecutor el número máximo de hilos TIENE QUE SER MAYOR O IGUAL al número determinado de hilos que se ejecutarán al mismo tiempo, o de lo contrario el programa lanza una excepción.
  • De transferencia: Crea un hilo para cada tarea sin siquiera intentar ponerlo en la cola. La tarea es rechazada si no se puede ejecutar inmediatamente.

La cola de espera puede ser cualquier clase que implemente la interface BlockingQueue.

Hay que tener mucho cuidado del número de hilos que se crean, independientemente de sin son ejecutados en el acto o se ponen en la cola de espera: una gran cantidad de hilos lleva a problemas de memoria y errores raros aunque los métodos estén perfectamente sincronizados. Si de antemano se sabe el número máximo de tareas que serán ejecutadas, lo recomendable es usar una cola de espera fija.

Para calcular el número óptimo de hilos que se ejecutarán concurrentemente, lo recomendable es analizar primero qué tan pesadas son las tareas que serán ejecutadas concurrentemente, y si son CPU-bound o I/O-bound.

Usando el ejemplo anterior:


public final class Master {

  public static void main(String a[]) {
      int THREAD_NUM = 10;
      CountDownLatch counter = new CountDownLatch(THREAD_NUM);
      ArrayBlockingQueue<Runnable> waitqueue = new ArrayBlockingQueue<Runnable>(THREAD_NUM);

      // 2 tareas al mismo tiempo, 2 tareas máximas, 1 segundo de timer, y la cola de espera
      ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,1,TimeUnit.SECONDS,waitqueue);

      // Ejecuta hasta 2 tareas al mismo tiempo, y pone a las demás en la cola de espera.
      for (int i = 0; i < THREAD_NUM; i++) {
        executor.execute(new Worker(new EvilTask(), counter));
      }

     // await() puede ser interrumpido, por lo que hay que ponerlo entre try/catch

     try {
        counter.await();
     } catch (InterruptedException e) {
        System.out.println("Counter was interrupted!");
    }

     // Si no se invoca, el programa se queda esperando por siempre aunque todas las tareas hayan terminado.
    // Si hay tareas pendientes, las termina de ejecutar antes de cerrarlo.
     executor.shutDown();
 }

}

class Worker implements Runnable {
  CountDownLatch counter = null;
  EvilTask task = null;

   Worker(EvilTask task, CountDownLatch counter) {
      this.task = task;
      this.counter = counter;
   }

   public void run() {
      try {
       tryToTakeOverTheWorld(task);

        // Pase lo que pase, cada hilo debe hacer countDown(), si no, el programa entra en deadlock.
     } finally {
       counter.countDown();
     }

   }
}

Mucho cuidado con olvidar ejecutar shutDown() en el ThreadPoolExecutor. Un programa se me quedaba trabado después de terminar su ejecución normalmente y todo era debido a eso. shutDown() termina de ejecutar las tareas pendientes, pero a shutDownNow() le vale y cierra todo esté como esté.

Está de más mencionar que un ThreadPoolExecutor tiene más funcionalidades, pero eso se puede revisar ya en la API.

Un problema que tuve en un programa reciente:  aunque hice todo con hilos desde el principio, una de las tareas a ejecutar concurrentemente era I/O-bound y tardaba mucho en ejecutarse; como usaba una LinkedBlockingQueue como cola de espera (dinámica), las tareas se seguían acumulando pues llegabas más rápido que lo que tardaban en ejecutarse. Esto me comenzó a crear conflictos, primero de memoria, luego errores de “Too many open files” (que luego desaparecieron cuando cambié los límites en el servidor) y luego errores raros de “Resource not available”. Confieso que sentí nostalgia por los errores de C cuando te metías a áreas de memoria que no debías, pero tardé en entender que, aunque la lógica estaba bien, la forma de ejecutar las cosas no era la óptima.

Lo que hice fue limitar el número de tareas que pasaban al ThreadPoolExecutor cada vez; la cola de espera se convirtió en una fija pues tenía el control de lo que pasaba en cada ciclo; y reduje el número de hilos que dependían de operaciones de I/O. El resultado: de tardar horas en una prueba que usaba además cerca de 8 GB de memoria, reduje el tiempo a hora y media y el uso de memoria a 1.2 GB, con los mismos sets de pruebas.

Programar con hilos es casi casi básico. Java facilita, en teoría, este proceso, pero para sacarle más provecho, sí vale la pena leer lo que java.util.concurrent ofrece.

7 Replies to “De hilos en Java”

  1. No entiendo todavía el por que separarlo del blog principal, en todo caso, se que no es el caso sobre todo ahora que lo explicas a profundidad, pero no es un problema donde es más fácil salir y ¿comprar un server dell de quad core + 16 o 32GB de RAM? digo tengo ganas de jugar con cosas dobles de cuatro cores, por supuesto tengo nula experiencia programando en paralelo, pero entiendo que es uno de los problemas actuales… en todo caso la maquina que te compraste de “emergencia” veo aún esta muy superior a lo que se esta ofreciendo para el usuario “casero”

    Saludos.

    1. Puedes comprar una súper máquina con todos los GB de RAM posibles, pero si no los sabes aprovechar, terminas simplemente con un programa secuencial que corre más rápido. El chiste de la programación concurrente o paralela es precisamente sacarle jugo a los núcleos o CPU, y para ello el programa tiene que mandar instrucciones especiales para que ciertas tareas se ejecuten en paralelo 🙂

  2. Hola..que tal..cóm opuedo eliminar/destruir un hilo del pool si éste se demora un determinado tiempo? como controlo esto?

      1. Gracias por tu respuesta…pero la verdad soy un poco nuevo en java =) y aún me quedan unas dudas,,,estoy implementando el código propuesto más arriba…
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,1,TimeUnit.SECONDS,waitqueue); – See more at: http://manuel.midoriparadise.com/2010/09/de-hilos-en-java/comment-page-1/#comment-57401

        ese TimeUnit.SECONDS de 1 segundo qué significa??

        El hilo no lo estoy creando con el Executor, si no de esta manera:
        Tengo la clase Worker.java que implementa Runnable, por lo que cada tarea se crea:

        executor.execute(new Worker(“name”, 5000, counter); // nombre, sleep, counter

        Yo lo que necesito es manejar un Pool de Hilos…y controlar si alguno de estos hilos se “engloba” o demora por alguna razón (por ejemplo, cada hijo consumirá un web service, por lo que puede quedar mucho tiempo esperando respuesta, y en ese caso debo quitarlo del pool y dar paso a otro hilo)

        Estoy usando el mismo código propuesto por ti pero con algunas modificaciones para pruebas..

        Te doy mi skype por si es posible ubicarte ahi: r.sanzo29

        Te agradezco desde ya.

      2. Ese timeunit es el keep-alive time, y creo que hace más o menos lo que quieres hacer.

        Del API de ThreadPoolExecutor:

        Keep-alive times
        If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see getKeepAliveTime(java.util.concurrent.TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using method setKeepAliveTime(long, java.util.concurrent.TimeUnit). Using a value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSizeThreads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero.

        Pero si no, con el método explicado en la respuesta de StackOverflow creo que puedes lograr lo que deseas, ya que especificas el timeout del hilo.

        Va, te agrego a Skype.

        ¡Saludos!

  3. Hola nuevamente..

    lo que estoy intentando es lo siguiente: (es bien similar a tu código)

    public class MasterWorker {

    public static void main(String args[])
    {
    int THREAD_NUM = 7;
    CountDownLatch counter = new CountDownLatch(THREAD_NUM);

    ArrayBlockingQueue waitqueue = new ArrayBlockingQueue(THREAD_NUM);

    //4 tareas al mismo tiempo, 4 tareas máximas, 2 segundo de timer, y la cola de espera
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS, waitqueue);

    //Ejecuta hasta 4 tareas al mismo tiempo, y pone a las demás en la cola de espera.
    for(int i=1; i<=THREAD_NUM; i++)
    {
    Worker t = new Worker("hilo"+i, 4000, counter);
    if(i0)
    System.out.println(“En cola: hilo”+i);
    }
    try
    {
    //await() puede ser interrumpido, por lo que hay que ponerlo entre try/catch
    counter.await();
    }
    catch(InterruptedException e)
    {
    System.out.printf(“counter was interrupted”);
    }
    // Si no se invoca, el programa se queda esperando por siempre aunque todas las tareas hayan terminado. // Si hay tareas pendientes, las termina de ejecutar antes de cerrarlo.
    executor.shutdown();
    }

    //la clase WORKER

    public class Worker implements Runnable{

    CountDownLatch counter = null;
    String name = null;
    int sleepTime;
    private long tiempoTrabajo;

    public Worker(String name, int sleep, CountDownLatch counter)
    {
    this.name = name;
    this.sleepTime = sleep;
    this.counter = counter;
    this.tiempoTrabajo = System.currentTimeMillis();
    }

    public void run()
    {
    try
    {
    System.out.printf(“EJECUTANDO”+this.name+ “.\n”);
    Thread.sleep(sleepTime);
    }
    catch(InterruptedException ex)
    {
    ex.printStackTrace();
    }
    finally
    {
    counter.countDown();
    }
    System.out.println(“TERMINÓ “+this.name+”.”);
    System.out.println(“DURACION “+this.name+”: “+ (System.currentTimeMillis() – this.tiempoTrabajo));
    }

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public long getTiempoTrabajo() {
    return tiempoTrabajo;
    }
    public void setTiempoTrabajo(long tiempoTrabajo) {
    this.tiempoTrabajo = tiempoTrabajo;
    }
    }

    Los primeros 4 hilos quedan con un sleep de 4 segundos, los 3 siguientes quedan encolados, luego cuando vayan terminando los primeros van entrando los últimos. Así es como entiendo que funciona este código.

    Lo que intento es

    Una salida de este programa es:

    En cola: hilo5
    En cola: hilo6
    En cola: hilo7
    EJECUTANDO hilo2.
    EJECUTANDO hilo3.
    EJECUTANDO hilo1.
    EJECUTANDO hilo4.
    TERMINÓ hilo3.
    DURACION hilo3: 4013
    EJECUTANDO hilo5.
    TERMINÓ hilo2.
    DURACION hilo2: 4013
    EJECUTANDO hilo6.
    TERMINÓ hilo1.
    DURACION hilo1: 4026
    EJECUTANDO hilo7.
    TERMINÓ hilo4.
    DURACION hilo4: 4027
    TERMINÓ hilo5.
    TERMINÓ hilo6.
    DURACION hilo6: 9013
    DURACION hilo5: 9013
    TERMINÓ hilo7.
    DURACION hilo7: 9026

    De esto, aún no sé cómo funciona el “2, TimeUnit.SECONDS”, ni cómo quitar de la cola un hilo en particular. Supongamos que el hilo tuviera un sleep de 6 segundos, quiero lograr matar el hilo y dar paso al siguiente. Es posible controlar esto?

    Muchas gracias desde ya.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.