Cajitas que piensan

High Water Mark

Basado en los apuntes de Unmesh Joshi

El patron WAL nos permite recuperar el estado del sistema luego de una caída o reinicio. Pero esto no nos asegura suficiente disponibilidad en caso de la caída de un nodo, puesto que los clientes deberán esperar a que el servidor reinicio.

Hablamos de usar el patrón Lider y Seguidores para mantener replicado el WAL en una cantidad Quorum de nodos, para que en caso de caida del lider, se elija un nuevo lider y los clientes no vean falta de disponibilidad del servicio.

Pero aun con todo este esquema y su robustez, tenemos algunos puntos donde puede haber problemas:

  • El lider puede caerse antes de enviar una linea de log a sus seguidores
  • El lider puede caer después de enviar su log a algunos seguidores, pero no a un quorum de ellos.

En estos escenarios, tendríamos seguidores que no tienen todas las entradas del WAL que tenia el lider, y seguidores que tienen mas que otros. Por lo tanto es importante que cada nodo sepa que partes del log ya todos tienen replicadas y por lo tanto son visibles para los clientes.

La High Water Mark es un indice en el WAL que permite indicar cual fue la ultima linea del log que esta confirmada que fue replicada exitosamente en un Quorum de seguidores. En cada mensaje del Lider a sus seguidores, se adjunta esta marca. Esto permite que todos los nodos del cluster puedan solo devolverle al cliente información sobre operaciones que están por debajo de esa marca.

Aqui podemos ver como se veria esto en un sistema de 3 nodos donde uno se atraso

highWater

Si lo viéramos en un diagrama de como van intercambiando mensajes

highWaterDiagram

Por cada entrada en el WAL, el lider adjunta esta marca, y la envía a sus seguidores

class ReplicationModule {
	private Long appendAndReplicate(byte[] data) {
      Long lastLogEntryIndex = appendToLocalLog(data);
      logger.info("Replicating log entries from index " + lastLogEntryIndex);
      replicateOnFollowers(lastLogEntryIndex);
      return lastLogEntryIndex;
  }


  private void replicateOnFollowers(Long entryAtIndex) {
      for (final FollowerHandler follower : followers) {
          replicateOn(follower, entryAtIndex); //send replication requests to followers
      }
  }
}

Los seguidores reciben estos mensajes y agregan esas lineas de log a su log local. Si logran hacerlo, responden al líder con el indice del ultimo log que tienen. Además adjuntan su Generation Clock actual.

class ReplicationModule {

  private ReplicationResponse handleReplicationRequest(ReplicationRequest replicationRequest) {
      List<WALEntry> entries = replicationRequest.getEntries();
      for (WALEntry entry : entries) {
          logger.info("Applying log entry " + entry.getEntryId() + " in " + serverId());
          wal.writeEntry(entry);
      }
      return new ReplicationResponse(SUCCEEDED, serverId(), replicationState.getGeneration(), wal.getLastLogEntryId());
  }
}

Con estas respuestas, el Lider puede mantener un trackeo de que indices fueron replicados en cada nodo.

class ReplicationModule {
	private void handleReplicationResponse(ReplicationResponse response){
		recordReplicationConfirmedFor(response.getServerId(), response.getReplicatedLogIndex());
 		long logIndexAtQuorum = computeHighwaterMark(logIndexesAtAllServers(), config.numberOfServers());
  		if (logIndexAtQuorum > replicationState.getHighWaterMark()) {
      		var previousHighWaterMark = replicationState.getHighWaterMark();
      		applyLogAt(previousHighWaterMark, logIndexAtQuorum);
      		replicationState.setHighWaterMark(logIndexAtQuorum);
  		}
	}
}

Calcular la High Water Mark es tan simple como revisar el indice que devolvió cada seguidor y el log del Lider mismo, y quedarse con el indice mas alto que este en un quorum de nodos.


class ReplicationModule {
	Long computeHighwaterMark(List<Long> serverLogIndexes, int noOfServers) {
      	serverLogIndexes.sort(Long::compareTo);
      	return serverLogIndexes.get(noOfServers / 2);
	}
}

A partir de este momento, el Líder puede propagar la marca a todos los seguidores en los mensajes de HeartBeat o en mensajes independientes.

Todo cliente que consulte el cluster podría ver operaciones por debajo de la High Water Mark, para evitar que si el líder se cae y el nuevo líder es un nodo que no tenia replicados esos datos, no haya inconsistencias en la respuesta.

class ReplicationModule{
	public WALEntry readEntry(long index) {
     		if (index > replicationState.getHighWaterMark()) {
          		throw new IllegalArgumentException("Log entry not available");
      	}
      	return wal.readAt(index);
  	}
}

## Truncado del log

Cuando un servidor se une a un cluster después de una caida o reinicio, existe la posibilidad de que tenga lineas en su log que esten en conflicto. Por lo tanto al unirse, chequea con el líder para saber que entradas del podrían ser potencialmente conflictivas, luego borra del log del final hacia el inicio hasta que las entradas coincidan con las del líder. Es entonces que puede empezar a agregar a su log las siguientes entradas que tiene el líder (proceso clásico de replicación del WAL), asegurando que su log coincide con el del resto del cluster.

Podemos ver este escenario en el siguiente ejemplo, donde un cliente envía 4 inserciones, el líder logra replicar 3 de estas pero se cae el nodo después de guardar la 4ta en su propio log.

leaderFailure

Se elige un nuevo líder el cual sigue recibiendo datos y escribe la inserción 5 (que podría ser un reintento de la 4to o ser otra cosa).

newLeader

Cuando vuelve a la vida el viejo líder, tiene en su log la entrada 4 pero eso entra en conflicto con lo que tiene el líder actual, por lo tanto trunca su log hasta la entrada 3 y ahi puede comenzar a sincronizarse y recibir la entrada 5.

logTruncate

Cualquier servidor que se una al cluster después de una pausa, busca al nuevo líder. Le pide explícitamente que le diga en cuanto esta la High Water Mark, trunca su log hasta esa altura, y pasa a sincronizar todas las entradas que están por arriba de la marca.

Para detectar estos registros conflictivos, en algunas implementaciones como Raft se utiliza una técnica donde se chequea las entradas del log local contra las que vienen en los request desde el Lider. Las entradas que tengan el mismo indice de log pero un Generation Clock menor, son borradas.

class ReplicationModule{

  private void maybeTruncate(ReplicationRequest replicationRequest) throws IOException {
      if (replicationRequest.hasNoEntries() || wal.isEmpty()) {
          return;
      }

      List<WALEntry> entries = replicationRequest.getEntries();
      for (WALEntry entry : entries) {
          if (wal.getLastLogEntryId() >= entry.getEntryId()) {
              if (entry.getGeneration() == wal.readAt(entry.getEntryId()).getGeneration()) {
                  continue;
              }
              wal.truncate(entry.getEntryId());
          }
      }
  }
  • Todos los algoritmos de consenso usan el concepto de High Water Mark para conocer cuando aplicar los cambios.
  • En Raft se le conoce como el CommitIndex
  • En Kafka hay un indice separado llamado “High-Water mark”, los consumidores solo pueden ver entradas hasta ese valor.
  • BookKeeper tiene un concepto de “ultima entrada confirmada”, que simboliza lo mismo.