Cajitas que piensan

Leader and Followers

Basado en los apuntes de Unmesh Joshi

Para lograr la tolerancia a fallos en sistemas distribuidos, hablamos de tener el WAL, pero eso no nos brindaba alta disponibilidad puesto que requería un tiempo ante la caída de un nodo para volver a estar disponible el servicio.

Este problema lo solucionamos teniendo la data replicada en N nodos, pero al hacer esto surge la necesidad de saber cuando el resultado de una operacion (un insert o un update en una base por ejemplo), esta disponible para los clientes.

Hablamos del Quorum, el cual nos permitía dar por exitosa una operacion cuando era aprobada por al menos una cantidad mayoritaria de los nodos del cluster. Esto no nos brinda consistencia a la hora de leer la data desde un cliente, puesto que un nodo en el Quorum devolvería algo mientras que otro nodo podría llegar a devolver otro dato. Esto sucede puesto que los nodos no conocen por si mismos el estado de la data repartida en el cluster, tal vez de los 3 nodos que hicieron el quorum, solo 1 la flusheo a disco, los otros aun no, por lo cual podrían hacer lecturas inconsistentes.

## ¿Por qué la lectura/escritura en Quorum no es suficiente para garantizar una consistencia fuerte?

Pareceria que leer y escribir con Quorum, por ejemplo en Cassandra, podría ser suficiente p ara tener una consistencia fuerte ante caídas o fallas de servidores. Veamos con un ejemplo porque esto no alcanza:

  1. Tenemos un cluster de 3 nodos, con un factor de replicación de 3, guardamos la variable X=1, y por lo tanto hay una copia de ella en los 3 nodos.
  2. Bob escribe x=2 con un factor de replicación de 3. La escritura es enviada a los 3 nodos, pero solo es exitosa en el nodo 1 (puede ser un problema de red o que Bob luego de escribir en el nodo 1 tuvo un garbage collection muy largo).
  3. Alice lee el valor de x de los nodos 1 y 2. Consigue el valor x=2 porque el nodo 1 tiene el valor más actualizado.
  4. Alice vuelve a leer el valor de x, pero en ese momento se cae el nodo 1. Por lo tanto Alice tendrá una lectura del nodo 2 o 3, los cuales tienen valores viejos para X.

En este escenario Alice realizo 2 lecturas consecutivas tienen valores diferentes y desactualizados. Cuando el nodo 1 vuelva, ya volveremos a tener el valor mas actual. Algunos procesos como readRepair o antiEntropy podrán corregir estos problemas en el cluster, pero lo harán eventualmente. Por lo tanto no hay manera de que el cluster solo leyendo con quorum pueda asegurarse que una vez que un valor fue leído del cluster, todas las lecturas siguientes seguirán devolviendo ese valor aunque falle un nodo.

Para poder tener consistencia, es necesario leer el valor de cada nodo y quedarse con el mas reciente, por ejemplo.

Con el objetivo de dar alta disponibilidad al servicio y consistencia, una opción es utilizar un sistema con Lider y seguidores

El lider sera el encargado de tomar decisiones por todos los miembros del cluster y de propagar esas decisiones a todos los nodos (por ejemplo cual es el ultimo dato disponible para devolver a los clientes).


Pasos

Inicio

Uno de los miembros del cluster debe ser elegido como lider, para que cada vez que un nodo inicie la aplicación, este podría buscar a un lider y en caso de no encontrarlo, llamar a votación. Hasta tanto no tener un lider elegido, los procesos rechazaran cualquier solicitud. Una vez que el lider este disponible, podría recibir consultas de los clientes, y los seguidores podrán redirigir cualquier consulta este nodo.

Elección de un lider

Cada vez que se inicia la aplicación, se intenta disparar una elección de lider y hasta no tener uno no se aceptan solicitudes. Por lo cual podríamos decir que un nodo en esta configuración puede estar en 3 estados:

  1. Lider
  2. Seguidor
  3. Buscando Lider

Usaremos el HeartBeat para conocer si el lider ha caído y por lo tanto debemos llamar a elección. El llamado a elección se realiza pidiendo al resto de los nodos que nos den su voto:

private void startLeaderElection() {
      replicationState.setGeneration(replicationState.getGeneration() + 1);
      registerSelfVote();
      requestVoteFrom(followers);
  }

Cluster Chico - Elección por consenso

  • Queremos elegir al nodo que tenga la version más actualizada de los datos. Para esto tenemos que fijarnos en:
    • El que tenga el Generation Clock más reciente.
    • El que tenga el WAL con la entrada mas reciente.
  • Si todos tienen la data igual de actualizada entonces en base a las particularidades del algoritmo:
    • En el caso de Zab se elige al servidor mejor rankeado o con mayor id.
    • Asegurarse que solo 1 nodo a la vez pida que lo voten (timeout randomizados para evitar iniciar todos a la vez), por lo cual el que inicia la votación es elegido es lo que se hace en Raft

Mientras no haya un cambio de Generación, cada nodo seguirá votando para siempre al nodo por el cual voto la primera vez, por lo que una vez que ya se eligió a un nodo, si otro nodo viene y pide elección, no habra modificaciones en el lider.


VoteResponse handleVoteRequest(VoteRequest voteRequest) {
      VoteTracker voteTracker = replicationState.getVoteTracker();
      Long requestGeneration = voteRequest.getGeneration();
      if (replicationState.getGeneration() > requestGeneration) {
          return rejectVote(); // Rechazamos porque es una generación vieja.

      } else if (replicationState.getGeneration() < requestGeneration) {
		// Si me llega un pedido de voto para una generación que no conozco
		followTheLeader(requestGeneration, voteRequest)
      }
	// En caso de que ya haya votado en esta generación
      return handleVoteRequestForSameGeneration(voteRequest);
  }

private VoteResponse followTheLeader(Long requestGeneration, VoteRequest voteRequest){
	becomeFollower(requestGeneration); // Me hago seguidor
     voteTracker.registerVote(voteRequest.getServerId()); // Voto por el nodo que me pidió su voto
    return grantVote();
}

  private VoteResponse handleVoteRequestForSameGeneration(VoteRequest voteRequest) {
      Long requestGeneration = voteRequest.getGeneration();
      VoteTracker voteTracker = replicationState.getVoteTracker();
	// Si ya vote en esta generación
      if (voteTracker.alreadyVoted()) {
		if (voteTracker.grantedVoteForSameServer(voteRequest.getServerId())){
			// Si había votado por el, le vuelvo a dar mi voto
			return grantVote();
		} else {
			// Si no había votado por el, rechazo la solicitud
			return rejectVote();
		}
      }
	// No he votado en esta elección por lo cual comparo su WAL contra el mío
      if (voteRequest.getLogIndex() >= (Long) wal.getLastLogEntryId()) {
          followTheLeader(requestGeneration, voteRequest)
      }

      return rejectVote();
  }

  private void becomeFollower(Long generation) {
      replicationState.setGeneration(generation);
      transitionTo(ServerRole.FOLLOWING);
  }

  private VoteResponse grantVote() {
      return VoteResponse.granted(serverId(),
              replicationState.getGeneration(),
              wal.getLastLogEntryId());
  }

  private VoteResponse rejectVote() {
      return VoteResponse.rejected(serverId(),
              replicationState.getGeneration(),
              wal.getLastLogEntryId());
  }

El nodo que reciba votos de la mayoría de los nodos del cluster, pasara a estado Lider. Para la mayoría se vuelven a utilizar los conceptos vistos en Quorum, y una vez que el nodo se convierte en Lider comienza a enviar HeartBeats a sus seguidores, si estos no recibieran durante cierto tiempo estos mensajes, volverían a llamar a elección.

Proceso de elección en Raft:

Eleccion

Y si lo viéramos internamente en los estados que toma un nodo:

Node internal machine

Cluster Grande - Elección con storage externo

Al igual que vimos en Consenso, cuando tenemos clusters de varios miles de nodos, es más sencillo utilizar un sistema externo como Zookeeper o etcd. Este tipo de sistemas grandes tienen un nodo que hace de Maestro o Controlador y toma las decisiones para todo el cluster.

Funciones necesarias:

  • Una instrucción de Compare and Swap para seleccionar una clave atómica.
  • Una implementación de HearBeat que expire la clave si no se recibe un heartbeat por parte del lider y por lo tanto hay que llamar a elección.
  • Un mecanismo de notificación para informar a todos los nodos si una clave expira.

Cada nodo utilizara la instrucción CompareAndSwap para tratar de crear una clave con su valor en el storage, dado que solo uno puede tener éxito en la creación, el que lo logre sera elegido como lider.

En algunas implementaciones esta clave se crea con un TTL bajo, y el lider debe ir actualizando constantemente su valor para mantenerla viva. A su vez los followers utiliza el mecánicos de notificación para enterarse si la clave expira.

  • En etcd existe la operacion CompareAndSwap por ejemplo.
  • En Zookeeper no existe tal operacion, pero puede ser implementada su funcionalidad intentando crear un nodo y controlando la falla si este nodo ya existe. Para dar la funcionalidad de TTL se puede utilizar un nodo efímero, el cual existe solo mientras haya una sesión activa entre el que creo el nodo efímero y el servidor de zookeper, cuando esta sesión se termine, el nodo es borrado y todos los que tengan un listener sobre el cluster serán notificados.
class Server {

public void startup() {
      zookeeperClient.subscribeLeaderChangeListener(this);
      elect();
  }

  public void elect() {
      var leaderId = serverId;
      try {
          zookeeperClient.tryCreatingLeaderPath(leaderId);
          this.currentLeader = serverId;
          onBecomingLeader();
      } catch (ZkNodeExistsException e) {
          // No pude crear el nodo por lo tanto voy a ver quien si pudo
          this.currentLeader = zookeeperClient.getLeaderId();
      }
  }
}

Luego que uno de los nodos del cluster fue elegido, cada nodo del cluster quedara revisando la salud del lider. Cuando se detecta una caída, se inicia una nueva elección. El storage externo que utilizamos para la elección es normalmente quien nos brinda las funciones de miembros del grupo y detección de fallas. Por ejemplo con zookeeper si quisiéramos tener detección de cambios en el lider:

class ZookeeperClient {
   public void subscribeLeaderChangeListener(IZkDataListener listener) {
      zkClient.subscribeDataChanges(LeaderPath, listener);
  }

Se utilizo para el ejemplo Zookeeper puesto que era el mas complejo al no tener la operacion directa de compareAndSwap pero también se puede usar etcd o Consul

Diagrama