Cajitas que piensan

Heart Beat

Basado en los apuntes de Unmesh Joshi

Si dividimos la data entre varios servidores (un cluster), es importante que los miembros del cluster tengan una manera de detectar a tiempo fallas en los nodos que forman el cluster para poder tomar medidas correctivas para asegurar la consistencia y salud de la información.

Ping

Si mando un mensaje, y el servidor responde, entonces aun esta vivo, si repito esto en intervalos constantes, el riesgo de ejecutar operaciones sin saber si todos los servidores están disponibles queda acotado a una ventana de tiempo fija (la distancia entre mensajes). Si pasado X cantidad de tiempo, no hemos obtenido respuesta, se registra al servidor en cuestión como “No Disponible”

Entonces tenemos varias variables a configurar:

  • Intervalo de solicitud: Tiempo entre 2 mensajes para saber si un nodo aun esta vivo.
  • Intervalo de espera: Tiempo que se espera sin un mensaje ok por parte del nodo que estamos consultando, para luego clasificarlo como no disponible.

Es importante tener en cuenta el tiempo que demora un mensaje en llegar del servidor A al B en nuestra red, puesto que ambas variables deben ser acordes al mismo. Regla general:

Intervalo de espera > Intervalo de solicitud > Tiempo promedio de la red

Por lo cual podríamos tener una red donde enviar un mensaje y que vuelva lleve 100ms, enviamos un mensaje cada 500ms preguntando si el nodo esta disponible y tal vez esperamos 5 segundos sin mensajes exitosos para marcarlo como no disponible. Esto genera una ventana de 10 mensajes (5 segundos) en la cual el nodo A puede no saber que el nodo B esta o no disponible.

Tanto el emisor como el receptor, implementaran un scheduler como el siguiente


class HeartBeatScheduler {

  public class HeartBeatScheduler implements Logging {
      private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
  
      private Runnable action;
      private Long heartBeatInterval;
	private ScheduledFuture<?> scheduledTask;

      public HeartBeatScheduler(Runnable action, Long heartBeatIntervalMs) {
          this.action = action;
          this.heartBeatInterval = heartBeatIntervalMs;
      }

      public void start() {
          scheduledTask = executor.scheduleWithFixedDelay(new HeartBeatTask(action), heartBeatInterval, heartBeatInterval, TimeUnit.MILLISECONDS);
      }
}

Esa action podría ser algo como enviar un mensaje con el id del servidor que esta enviando el mensaje:

private void sendHeartbeat() throws IOException {
      socketChannel.blockingSend(newHeartbeatRequest(serverId));
  }

En cambio en el nodo que recibe el mensaje el scheduler tiene que ejecutar un chequeo de si recibió o no un heartbeat:


class AbstractFailureDetector {
  private HeartBeatScheduler heartbeatScheduler = new HeartBeatScheduler(this::heartBeatCheck, 100l);

  abstract void heartBeatCheck(); // Para chequear cuales están caidos
  abstract void heartBeatReceived(T serverId); // Para marcar los que recibo
}

Entonces luego tendremos un método que recibe request y detecta si tiene que usarlos para el sistema de fallos del cluster o no:


class ReceivingServer{
  private void handleRequest(Message<RequestOrResponse> request) {
      RequestOrResponse clientRequest = request.getRequest();
      if (isHeartbeatRequest(clientRequest)) {
          HeartbeatRequest heartbeatRequest = JsonSerDes.deserialize(clientRequest.getMessageBodyJson(), HeartbeatRequest.class);
          failureDetector.heartBeatReceived(heartbeatRequest.getServerId());
          sendResponse(request);
      } else {
          //procesamos otro tipo de requests
      }
  }

Para decidir si un nodo esta o no disponible hay muchas heurísticas y criterios. Si tenemos intervalos cortos, es probable que tengamos falsos positivos, puesto que apenas falla un heartbeat ya marcamos al nodo como no disponible (lo cual puede generar movimiento de datos o reajustes en el cluster). Hay 2 grande categorizas de criterios:

Clusters pequeños - Consensus

El heartbeat se envía del líder a los seguidores, y se guarda el timestamp de la respuesta. Si no se reciben mensajes en una ventana fija de tiempo, el líder se da por muerto y se elige uno nuevo.

Como siempre, podríamos ser que no este muerto el proceso y que simplemente sea un tema de lentitud de red (o un Stop the world de java), por lo cual es necesario utilizar un Generation Clock para detectar mensajes de lideres perdidos. Esto permite detectar en pequeños periodos de tiempo cualquier caída, es recomendable para clusters de 3 a 5 nodos (Zookeper o ElasticSearch por ejemplo).

class TimeoutBasedFailureDetector{

  @Override
  void heartBeatReceived(T serverId) {
      // Guardamos la hora en la que recibimos el ultimo mensaje
      Long currentTime = System.nanoTime();
      heartbeatReceivedTimes.put(serverId, currentTime);
      // Marcamos al nodo como disponible
      markUp(serverId);
  }

  @Override
  void heartBeatCheck() {
      Long now = System.nanoestae();
      Set<T> serverIds = heartbeatReceivedTimes.keySet();

      for (T serverId : serverIds) {
          Long lastHeartbeatReceivedTime = heartbeatReceivedTimes.get(serverId);
          Long timeSinceLastHeartbeat = now - lastHeartbeatReceivedTime;
          if (timeSinceLastHeartbeat >= timeoutNanos) {
              markdown(serverId);
          }
      }
  }

Consideraciones

  • Es recomendable que los hearbeats se envíen en un thread separado y de manera asincrónica (Akka por ejemplo hace esto para el envío).
  • Podría pasar que la aplicación este pausada durante algunos segundos (Garbage collector por ejemplo), en ese tiempo no procesa ninguno de los heartbeats, por lo cual al volver de su pausa, podría entender que todo el cluster esta muerto salvo el mismo. Para salvar estos casos, el heartBeatCheck debería saber que esta corriendo luego de una larga pausa y tenerlo en cuenta para no marcar ningún nodo como down hasta la siguiente ejecución, como se hace en Cassandra.

Clusters grandes - Gossip

Como en muchos problemas, la escala importa, y la técnica anterior presenta algunos cuando lo llevamos a escenarios de cientos o miles de nodos en un cluster:

  • Cantidad de mensajes generados por nodos (N-1 * N)
  • El ancho de banda consumido por cada mensaje, no queremos utilizar la red en su totalidad solo para enviar mensajes secundarios a la tarea principal de la aplicación.

Por esto es que el envío de heartbeats todos contra todos no se usan y se eligen protocolos de Gossip donde la información de la topología del cluster sera comunicada de nodo a nodo, de manera descentralizada y sin necesidad de que todos los nodos interactúen entre ellos. Estos protocolos brindan funcionalidades tales como:

  • Quienes son los miembros del cluster
  • Que miembros del cluster están disponibles
  • Broadcast de eventos

Gossip

En caso de falla estos clusters toman acciones como mover la data de un nodo a otro, entonces se prioriza la correcta detección por sobre las demoras (con cierto limite). Para lograrlo se normalmente a cada nodo se le asigna un valor de suspicacia, el cual va incrementándosela a medida que no se detecta a ese nodo en el gossip en un intervalo de tiempo. Esto se va calculando en base a estadísticas previas y cuando el valor de suspicacia supera cierto umbral, entonces se lo marca como caído.

Hay 2 implementaciones principales:

  1. Phi Accrual Failure Detector: Utilizado en Akka y Cassandra, donde se toma en cuenta la performance de la red, la carga y las condiciones históricas, además de ciertos parámetros de sensibilidad (un datacenter privado vs la nube publica).
  2. SWIM: Utilizado en Hashicorp consul y memberlist, donde se le hizo una mejora al protocolo para soportar los casos de consumo de CPU o memoria que generaran falsos positivos

Estas implementaciones soportan miles de maquinas, en el caso de Akka se ha probado con hasta 2400 servidores mientras que consul tiene clusters de mas de 10 mil nodos.

### Ejemplo

En el caso de Akka cada nodo esta siendo monitoreado por una cantidad de nodos (por defecto 5), si alguno lo detecta caído, entonces lo comunica a todo el cluster.

Si todos los nodos monitores (en este caso 5) detectan que esta el nodo disponible, entonces se lo comunican al cluster para que este de vuelta disponible.