Cajitas que piensan

Request Pipeline

Basado en los apuntes de Unmesh Joshi

La comunicación entre servidores de un cluster usando Single Socket Channel puede generar problemas de performance si los mensajes entrantes tienen que esperar a que la respuesta de mensajes previos llegue.

Para poder tener rendimiento y tiempo de respuesta, la cola de mensajes tiene que estar suficientemente llena para asegurar que toda la capacidad del servidor esta siendo utilizada. Por ejemplo, cuando se utiliza Singular Update Queue en un servidor, puede aceptar solicitudes hasta que la cola se llene; si solo un mensaje es enviado a la vez, entonces la mayor parte de la capacidad del servidor se desperdicia.

Para solucionar esto, utilizaremos el concepto de Request Pipeline, donde cada nodo envía mensajes a los otros sin esperar su respuesta. Esto se logra creando 2 threads separados, uno para enviar mensajes y otro pare recibir las respuestas.

Request Pipeline

El emisor envía el mensaje a través del socket channel, sin esperar la respuesta.

class SingleSocketChannel{

  public void sendOneWay(RequestOrResponse request) throws IOException {
      var dataStream = new DataOutputStream(socketOutputStream);
      byte[] messageBytes = serialize(request);
      dataStream.writeInt(messageBytes.length);
      dataStream.write(messageBytes);
  }
}

Mientras un hilo independiente se inicia para leer las respuestas.

class ResponseThread{

  class ResponseThread extends Thread implements Logging {
      private volatile boolean isRunning = false;
      private SingleSocketChannel socketChannel;

      public ResponseThread(SingleSocketChannel socketChannel) {
          this.socketChannel = socketChannel;
      }

      @Override
      public void run() {
          try {
              isRunning = true;
              logger.info("Starting responder thread = " + isRunning);
              while (isRunning) {
                  doWork();
              }

          } catch (IOException e) {
              getLogger().error(e); //thread exits if stopped or there is IO error
          }
      }

      public void doWork() throws IOException {
          RequestOrResponse response = socketChannel.read();
          logger.info("Read Response = " + response);
          processResponse(response);
      }
}

El procesador de respuestas puede trabajar inmediatamente o enviar a una Singular Update Queue.

Consideraciones de usar un request pipeline:

  • Si constantemente estamos enviando sin esperar respuesta, el nodo receptor podría verse saturado. Por esta razón, hay un limite superior de cuantos request pueden estar en “el aire” a la vez, una vez que se alcanza no se aceptan mas mensajes y el emisor se bloquea. Una manera sencilla de conseguir esto es utilizando una Blocking Queue, la queue se inicializa con el tamaño máximo de mensajes en paralelo que podemos tener esperando respuesta, cada vez que uno responde, se quita un elemento de la queue para permitir que otro mensaje entre.
class RequestLimitingPipelinedConnection {

	private final Map<InetAddressAndPiniciativaBlockingQueue<RequestOrResponse>> inflightRequests = new ConcurrentHashMap<>();
  	private int maxInflightRequests = 5;

  	public void send(InetAddressAndPort to, RequestOrResponse request) throws InterruptedException {
		// Consigo la queue de ese nodo
      	ArrayBlockingQueue<RequestOrResponse> requestsForAddress = inflightRequests.get(to);
      	if (requestsForAddress == null) {
			// Soy el primer request por lo tanto iniciativo la queue
          		requestsForAddress = new ArrayBlockingQueue<>(maxInflightRequests);
          		inflightRequests.put(to, requestsForAddress);
      	}
		// Me agrego a la lista de requests en proceso
      	requestsForAddress.put(request);
	}

	private void consume(SocketRequestOrResponse response) {
      	Integer correlationId = response.getRequest().getCorrelationId();
      	Queue<RequestOrResponse> requestsForAddress = inflightRequests.get(response.getAddress());
      	RequestOrResponse first = requestsForAddress.peek();
      	if (correlationId != first.getCorrelationId()) {
        		throw new RuntimeException("First response should be for the first request");
      	}
      	requestsForAddress.remove(first);
      	responseConsumer.accept(response.getRequest());
  }
}
  • Soportar fallas y mantener el orden se vuelve difícil de implementar. Digamos que tenemos 2 mensajes esperándo respuesta, el primero falla y es reintentado, el servidor podría haber procesado el segundo antes que el reintento del primero llegue al servidor. Los servidores necesitan un mecanismo para rechazar mensajes fuera de orden sino siempre habría riesgo de que se desordenen los mensajes en escenarios de reintento y fallas.

    • Raft envía en cada mensaje el indice del mensaje anterior que debería haber llegado antes, si esto no ocurre, se rechaza el mensaje.
    • Kafka si permite tener mas de un mensaje en espera por conexión con una implementación de un productor idempotente, el cual asigna un identificador único a cada tanda de mensajes que es enviada al broker. El broker puede chequear el orden de los mensajes con esto.
  • En Kafka se incentiva el uso de request pipeline puesto que esto permite un mayor rendimiento.

  • En Zookpeer tambien se termina implementando un pipeline de requests.