Cajitas que piensan

Single Socket Channel

Basado en los apuntes de Unmesh Joshi

Cuando estamos en un contexto de Líder y Seguidores necesitamos asegurar que los mensajes entre ellos se mantendrán en orden, con un mecanismo de reintento en caso de perdida de mensajes. Esto tiene que hacerse manteniendo un bajo costo de creación de conexiones, para que abrir una nueva conexión no aumente los tiempos de respuesta del sistema.

El protocolo TCP por suerte nos ofrece todas estas características. Podemos conseguir la comunicación que necesitamos si nos aseguramos que toda comunicación entre un seguidor y su líder pase por un Single Socket Channel, luego cada seguidor serializa los updates que recibe del líder usando una Singular Update Queue.

Single Socket Channel

Los nodos nunca cierran la conexión una vez que la abren, y continuan leyendo de ella para mas solicitudes. Los nodos usan un hilo dedicado por cada conexión para leer y escribir, pero esto es solo necesario si se hacen operaciones bloqueares de IO.

Una implementación de un thread simple podría ser:

class SocketHandlerThread{

  @Override
  public void run() {
      try {
          //Continues to read/write to the socket connection till it is closed.
          while (true) {
              handleRequest();
          }
      } catch (Exception e) {
          getLogger().debug(e);
      }
  }

   private void handleRequest() {
      RequestOrResponse request = readRequestFrom(clientSocket);
      RequestId requestId = RequestId.valueOf(request.getRequestId());
      requestConsumer.accept(new Message<>(request, requestId, clientSocket));
    }
}

El nodo lee mensajes y los manda a una Singular Update Queue para su procesamiento. Una vez que el nodo ha procesado el mensaje, escribe la respuesta a través del socket.

Cuando un nodo establece una comunicación abre un Single Socket Connection que es usada por todos los mensajes del otro nodo.

class SingleSocketChannel{

  public class SingleSocketChannel implements Closeable {
      private Socket clientSocket;
      private final OutputStream socketOutputStream;
      private final InputStream inputStream;
  
      public SingleSocketChannel(InetAddressAndPort address, int heartbeatIntervalMs) throws IOException {
          clientSocket = new Socket(address.getAddress(), address.getPort());
          clientSocket.setSoTimeout(heartbeatIntervalMs * 10); //set socket read timeout to be more than heartbeat.
          socketOutputStream = clientSocket.getOutputStream();
          inputStream = clientSocket.getInputStream();
      }
  
      public RequestOrResponse blockingSend(RequestOrResponse request) throws IOException {
          writeRequest(request);
          byte[] responseBytes = readResponse();
          return deserialize(responseBytes);
      }
  
      private void writeRequest(RequestOrResponse request) throws IOException {
          var dataStream = new DataOutputStream(socketOutputStream);
          byte[] messageBytes = serialize(request);
          dataStream.writeInt(messageBytes.length);
          dataStream.write(messageBytes);
      }
}

Es importante mantener un timeout en la conexión para que no se bloquee indefinidamente en caso de errores. Se envían HeartBeats periódicamente a través del socket para mantener viva la conexión.

Se utiliza un timeout que sea múltiplo del intervalo de Heartbeat que se envía, para tener en cuenta el tiempo de ir y volver de la red (sugieren 10 veces el intervalo de Heartbeat).

Enviar mensajes en un solo channel puede crear problemas por Head of Line Blocking, para evitar esto se puede utilizar un Request Pipeline.

  • Zookeeper utiliza un single socket channel y un thread por cada seguidor para mantener la comunicación.
  • Kafka utiliza el mismo mecánismo para mantener replicados los mensajes.
  • Cassandra incluso permite configurar a bajo nivel los socket channel.