Cajitas que piensan

Singular Update Queue

Cuando hablábamos del WAL, se necesitaba escribir cada operacion que va a mutar el estado en un archivo. Necesitamos que esas escrituras sean procesadas de a una para mantener el orden del log y su consistencia, pero podemos tener N clientes enviando operaciones concurrentes.

Generalmente para proteger al sistema de modificaciones concurrentes se utiliza un lock sobre el recurso (supongamos lockeamos el archivo del WAL). Pero si la tarea que tenemos que realizar es lenta, como puede ser escribir en un archivo en disco, bloquear todos los otros hilos que necesitan escribir hasta que la tarea termine generaría una baja performance y un alto tiempo de respuesta solo por espera activa.

Para poder garantizar que solo se ejecutara una tarea a la vez pero siendo eficientes en la utilización de los recursos es que utilizamos la Singular Update Queue.

Implementaremos una cola de trabajo con un solo hilo sacando tareas de la cola. Los clientes concurrentes podrán agregar tareas en la cola para que se hagan cambios, pero solo un hilo va a aplicar los cambios secuencialmente. Esto se puede implementar con goroutines y canales en algunos lenguajes como Golang.

Single Thread Worker

Una representación de como implementaríamos esto en Java

diagram

La Singular Update Queue contiene una cola y una función a aplicar a cada elemento de la cola, a su vez extiende de java.lang.Thread, para asegurarse que ejecuta en su propio hilo.

public class SingularUpdateQueue<Req, Res> extends Thread implements Logging {
	// La cola de hasta 100 elementos
    private ArrayBlockingQueue<RequestWrapper<Req, Res>> workQueue
            = new ArrayBlockingQueue<RequestWrapper<Req, Res>>(100);
	// La función a aplicar que toma un request y genera un response
    private Function<Req, Res> handler;
	// Una flag de que esta corriendo y que es visible en otros threads
    private volatile boolean isRunning = false;
}

Cada cliente envia tareas a la cola con sus propios hilos, la cola encapsula cada tarea en un contenedor para combinarlas con un futuro, devolviéndole al cliente el futuro así este puede realizar una acción cuando la tarea este completa.


class SingularUpdateQueue {

  public CompletableFuture<Res> submit(Req request) {
      try {
          var requestWrapper = new RequestWrapper<Req, Res>(request);
          workQueue.put(requestWrapper);
          return requestWrapper.getFuture();
      }
      catch (InterruptedException e) {
          throw new RuntimeException(e);
      }
  }

class RequestWrapper<Req, Res> {
    private final CompletableFuture<Res> future;
    private final Req request;

    public RequestWrapper(Req request) {
        this.request = request;
        this.future = new CompletableFuture<Res>();
    }

    public CompletableFuture<Res> getFuture() { return future; }
    public Req getRequest()                   { return request; }
}

Los elementos de la cola son procesados por un solo hilo dedicado que la Singular Update Queue hereda de la clase Thread. La cola permite que multiples productores generen tareas concurrentemente, por lo cual debe ser thread safe.

class SingularUpdateQueue{

  @Override
  public void run() {
       isRunning = true;
       while(isRunning) {
           Optional<RequestWrapper<Req, Res>> item = take();
           item.ifPresent(requestWrapper -> {
               try {
                   Res response = handler.apply(requestWrapper.getRequest());
			  // notifico al cliente con el futuro 
                   requestWrapper.complete(response);

               } catch (Exception e) {
                   requestWrapper.completeExceptionally(e);
               }
           });
      }
  }
}

class RequestWrapper {

  public void complete(Res response) {
      future.complete(response);
  }

  public void completeExceptionally(Exception e) {
      getFuture().completeExceptionally(e);
  }
}

Es importante ver que podemos ponerle un timeout a la lectura de la cola para no bloquearla indefinidamente. Esto permite que el hilo se libere y marque el isRunning en falso.

class SingularUpdateQueue {

  private Optional<RequestWrapper<Req, Res>> take() {
      try {
          return Optional.ofNullable(workQueue.poll(300, TimeUnit.MILLISECONDS));

      } catch (InterruptedException e) {
          return Optional.empty();
      }
  }

  public void shutdown() {
      this.isRunning = false;
  }
}

Un ejemplo de un sistema que procesa solicitudes de multiples clientes y va escribiendo en su WAL podría ser la siguiente:

WAL with singular queue

Un cliente que implemente un patron de Singular Update Queue debería setear sus tipos de datos y la función de procesamiento. En este caso nuestro sistema consumirá solicitudes de escribir en el WAL, un solo hilo controlara el acceso a esa estructura de log.

El consumidor necesita escribir cada linea en el log y devolver una respuesta, esta respuesta solo puede ser enviada después de que el mensaje fue exitosamente guardado en el log. Para asegurar el orden de estos pasos utilizamos la Singular Update Queue.


public class WalRequestConsumer implements Consumer<Message<RequestOrResponse>> {

    private final SingularUpdateQueue<Message<RequestOrResponse>, Message<RequestOrResponse>> walWriterQueue;
    private final WriteAheadLog wal;

    public WalRequestConsumer(Config config) {
        this.wal = WriteAheadLog.openWAL(config);
	   // Creation de la queue con su función de handling
        walWriterQueue = new SingularUpdateQueue<>((message) -> {
            wal.writeEntry(serialize(message));
            return responseMessage(message);
        });
        startHandling();
    }

    private void startHandling() { this.walWriterQueue.start(); }

}

En el generador del mensaje:


@Override
public void accept(Message message) {
    CompletableFuture<Message<RequestOrResponse>> future = walWriterQueue.submit(message);
    // Cuando termine
    future.whenComplete((responseMessage, error) -> {
        sendResponse(responseMessage);
    });
}

Tipos de Colas

La eleccion del tipo de estructura de cola es importante, la JVM presenta varias opciones:

  • ArrayBlockingQueue (la usa Kafka para solicitudes): Es una cola compuesta por un array bloqueante. Es usada cuando se necesita tener un tope máximo al momento de creación de la cola. Cuando se llega al máximo la estructura bloquea al productor, es util cuando tenemos escenarios con consumidores lentos y productores rápidos y necesitamos frenar la producción.

  • ConcurrentLinkedQueue con ForkJoinPool (la usa Akka en la casilla de correo): Se utiliza si no vamos a tener consumidores esperando por los productores (espera activa leyendo la cola buscando tareas), donde hay un coordinador que agenda a los consumidores solo después de que la tarea se encola en la ConcurrentLinkedQueue. El ForkJoinPool se usa para ir generando hilos y volviéndolos al pool.

  • LinkedBlockingDeque (usado por Zookeeper y Kafka para respuestas): Se usa sobretodo cuando se necesita una cola sin limite, sin bloquear al productor. Hay que tener cuidado con esto porque sin técnicas de control podemos gastar toda la memoria.

  • RingBuffer (usado LMAX Disruptor): Si la tarea a procesar es sensible a la latencia, puede ser que no sea aceptable las demoras al copiar tareas al ArrayBlockingQueue. El RingBuffer permite pasar tareas entre etapas para no agregar esta latencia.

## Usar hilos livianos y canales

Para algunos lenguajes es ideal ir por este camino puesto que ya soportan hilos livianos y el concepto de canales (golang y kotlin). Todas las solicitudes son enviadas a un solo canal para ser procesados. Una sola goroutine procesa todos los mensajes para actualizar el estado. La respuesta es entonces escrita a un canal separado, y procesada por una goroutine separada para enviarla a los clientes.

func (s *server) putKv(w http.ResponseWriter, r *http.Request)  {
  kv, err := s.readRequest(r, w)
  if err != nil {
    log.Panic(err)
    return
  }

  request := &requestResponse{
    request:         kv,
    responseChannel: make(chan string),
  }

  s.requestChannel <- request
  response := s.waitForResponse(request)
  w.Write([]byte(response))
}

Las solicitudes son procesadas en una sola goroutine que actualiza el estado

func (s* server) Start() error {
  go s.serveHttp()

  go s.singularUpdateQueue()

  return nil
}

func (s *server) singularUpdateQueue() {
  for {
    select {
    case e := <-s.requestChannel:
      s.updateState(e)
      e.responseChannel <- buildResponse(e);
    }
  }
}

Contrapresión

En caso de que estemos usando una cola para comunicación entre hilos es importante tener en cuenta la contrapresión.

Si el consumidor es lento y el productor rápido, la cola se va a terminar llenando, y salvo que tomemos alguna media nos quedaremos sin memoria. Lo normal es tener un limite y el emisor bloquea si no hay mas espacio.

Por ejemplo, java.util.concurrent.ArrayBlockingQueue tiene 2 métodos para agregar elementos. * Put bloquea al productor si el array esta lleno (aumentando la presión en el) * Add tira IllegalStateException si el array esta lleno pero no bloquea al productor

Invocando servicios externos

A veces el procesamiento de una tarea de la SingularUpdateQueue incluye la llamada a un servicio externo y el estado de la cola se actualiza con la respuesta del servicio. Es importante en este escenario que las llamadas de red sean no bloqueares o sino tendremos bloqueado el hilo principal que procesa las tareas. Las llamadas deben ser hechas asincrónicamente.

A su vez, es importante que no actualicemos el estado de la cola al procesar el futuro de la invocación externa, porque podríamos llegar a estar usando otro hilo, perdiendo así el objetivo de que todo cambio de estado sea aplicado por un solo hilo. El resultado de la llamada por lo tanto debe ser agregado a la cola de tareas como otros eventos o solicitudes.

  • La implementación de Zookeeper de su Request Pipeline es hecha con procesadores de solicitudes de un solo hilo.
  • El controlador en Apache Kafka, utiliza este patron para actualizar su estado en base a multiples eventos concurrentes que recibe de Zookeeper.
  • Cassandra usa este patron para actualizar el estado que recibe por Gossip.
  • LMAX-Diruptor sigue el Single Written Principle para evitar exclusion mutua mientras actualiza su estado local.