Franco Battaglia

Programming Reactive Systems (course notes)

Reactive manifesto

  • Event-driven => react to events
  • Scalable => react to load
  • Resilient => react to failures
  • Responsive => react to users

Being event-driven enables being scalable and resilient

Event-driven

Traditionally, systems are composed of multiple threads, which communicate and have shared state (sometimes synchronized)

Now, systems are composed from loosely coupled event handlers

Events can be handled asynchronously, without blocking => resource efficient

Scalable
  • Scale up: parallelism in multi-core systems
  • Scale out: multiple server nodes

Goals: minimize shared mutable state, location transparency, resilience

Resilient

Failures can be:

  • Software failures
  • Hardware failures
  • Connection failures

Goals: loose coupling, encapsulation of state, supervisor hierarchies

Responsive

Real-time interaction with users. Being responsive is the result of applying the other three properties

Goals: backpressure, algorithms, system design

Traditional event handling

Observer and callbacks

class Counter implements ActionListener {
  private var count = 0
  button.addActionListener(this) // register to event source. when there is a new click, the actionPerformed callback is called

  def actionPerformed(e: ActionEvent): Unit = {
    count += 1
  }
}

Problems:

  • actionPerformed has a side effect => shared mutable state
  • Cannot be composed
  • Callback hell

Solution: first class events, events as messages, event composing

Async execution (CPS: continuation passing style)

Compute on another computing unit, without explicitly waiting for its termination

A sync type signature can be turned async by:

  1. Returning Unit
  2. Taking a continuation parameter (callback) defining what to do after the computation
def syncProgram(a: A): B
def asyncProgram(a: A, k: B => Unit): Unit // k is a callback

To model failure, we can model the callback as k: Try[B] => Unit

Futures

def asyncProgram(a: A, k: B => Unit): Unit

// let's turn this into something cooler...
// first, let's curry k
def asyncProgram(a: A): (B => Unit) => Unit

// now, let's propose Future:
type Future[+T] = (T => Unit) => Unit
def futureProgram(a: A): Future[B]

// now, let's add failure handling
type Future[+T] = (Try[T] => Unit) => Unit

// now reify Future into a trait
trait Future[+T] extends ((Try[T] => Unit) => Unit) {
  def onComplete(k: Try[T] => Unit): Unit // onComplete is renamed from apply, because applying a Future is passing the callback
}

Notice: the only difference between using a callback and Future is that we can operate upon the returning value of the Future using onComplete, map, etc

A future is a currified callback function that returns itself

Operations on Future

Future suffers from the composability issues as callbacks, but it can transform using map, flatMap, zip, recover, etc

  • map transforms a Future[A] into a Future[B]
  • flatMap transforms Future[A] into Future[B] applying f after Future[A] has completed sequentially
  • zip joins two Futures by zipping their results into a Future[(A, B)]
Order of operations

zip does not impose sequentiality: when zipping two Futures, both are fired concurrently and joined when both finish

But if we flatMap two futures, we have to wait sequentially unless we fire the Futures first:

val eventuallyCoffee1 = makeCoffee()
val eventuallyCoffee2 = makeCoffee()
// both futures fired
eventuallyCoffee1.flatMap( c1 => eventuallyCoffee2.map(c2 => (c1, c2))
// ^^ concurrent because Future creation implies its execution

The creation of a Future implies its execution (much like an effectful Promise)

Syntax sugar: for comprehensions
for {
  work1 <- work
  _ <- takeABreak()
  work2 <- work()
} yield work1 + work2

This for comprehension is equal to flatmapping the first result and then joining it with the second, sequentially

Handling failures in Future
  • recover takes a partial function over Throwable
  • recoverWith is an async recover
Execution context

When applying a Future or using callbacks, we must use some thread to perform the task

  • If we only use a thread, there is no parallelism
  • We can use thread pools
  • Future takes an implicit ExecutionContext for each task

Introduction to the Actor model

Motivation:

  • CPUs with more cores, same frequency
  • Virtual cores sharing physical execution core
  • Multi-tasking: running multiple programs in parallel
  • Multi-threading: running parts of 1 program in parallel

Traditional solution: synchronize access to shared state (mutex). Possibility of deadlock, blocking


Actors represent non-blocking objects and their interactions. Formally, an actor is an object with identity, with behavior and async message passing

type Receive = PartialFunction[Any, Unit]

trait Actor {
  implicit val self: ActorRef
  def sender: ActorRef
}

abstract class ActorRef {
  def !(msg:Any)(implicit sender: ActorRef): Unit
}

Actors in Akka are modeled as ActorRefs. To execute actors, we need an ActorContext. ActorContext can create, stop, become or unbecome (modify) actor behavior. Actors are created by other actors

  • State change is explicit
  • State is scoped to current behavior

Important concepts on Actors

  • Actors can only send messages to known actors (including self)
  • Actors cannot access other actors' behavior - they can only send messages
  • No shared mutable state, no global synchronization
  • Messages are fire-and-forget, meaning communication is one-way. To communicate completion status (e.g. ACK), send the sender another message
  • Messages are received sequentially
  • Processing a message is the atomic unit of execution!!! VERY CONVENIENT FOR TX AND THREADING
  • No blocking, but an Actor's mailbox is like a queue
  • Messages are like letters, unreliable. Order in the same sender is guaranteed

Delivery of messages

Delivery of a message requires eventual availability of channel and recipient. Efforts:

  • At-most-once (send once). Keeps no state
  • At-least-once (send until ACK). Resends
  • Exactly-once. Receiver keeps track of which messages have been processed

Regardless of effort, how can we make the communication more reliable?

  • Log activities to persistent storage
  • Each activity has unique ID
  • Store IDs of completed activities within actors

Writing Actor systems

Example: recursive downloader. Given an URL, download the content, extract the links and follow them, bounded by a maximum depth

Plan of action: write a Receptionist (maps request to a Controller), a Controller (spawns Getters for all links), and Getters (actors who process the body using an HTTP client)

Failure in Actor systems

Where do failures go?

  • Reify failure as a message
  • Send to a known error address

Solution: individual failure is handled by a leader (supervisor actor)

  • Failed actor is terminated or restarted
  • Supervised actors form a tree structure
  • Supervisors create their subordinates, and take decisions
class Manager extends Actor {
  override val supervisorStrategy = OneForOneStrategy() {
    case _: DBException => Restart
    case _: ActorKilledException => Stop
    case _: ServiceDownException => Escalate
  }
}

Supervising strategies:

  • One for one (parent supervises each actor individually)
  • All for one (parent supervises a group of child actors, meaning messages from parent apply to all children)
  • Stopping: for any failure of a child, parent will stop it

Lifecycle

  • Hooks: preStart, postStart, preRestart, postRestart
  • DeathWatch: watchers register as listeners of other actors. When those actors die (stop), registrated actors receive a Terminated message. Unhandled Terminated messages are trated as DeathPact
  • Actors know their parent using context.parent and their children using context.children

The error kernel

  • Restarts are recursive
  • Restarts are more frequent near leaves
  • Avoid restarting stateful important actors. Important state => higher up the hierarchy, riskier operation => lower down the hierarchy

Event stream

Actors can direct messages ONLY to known addresses.

EventStream allows broadcasting (publication of messages to unknown audience)

Any actor can subscribe to EventStream, which can emit messages of certain topics

Actors know the stream using context.system.eventStream

Persisting actor state

Persisting current state

Snapshot, latest state in constant time. No previous history

Persisting changes

History can be replayed, audited, restored. Retroactive handling of errors. Better IO bandwidth, immutable changes can be replaced

Command sourcing: persist commands before processing, persist ACK upon processing. During recovery, all commands are replayed with effects scoped locally

Event sourcing: generate change requests (events). During a replay, the actor receives the events

sealed trait Event
case class PostCreated(text: String) extends Event
case object QuotaReached extends Event

case class State(posts: Vector[String], disabled: Boolean) {
  def updated(e: Event): State = e match {
    case PostCreated(text) = copy(posts = posts :+ text)
    case QuotaReached => copy(disabled = true)
  }
}

Distributed actors

Network, inter-process communication

  • Data sharing by value (no reference)
  • Lower bandwidth
  • Higher latency
  • Partial failure (network not reliable)
  • Data corruption

Actor communication is async, one-way. Actors are location transparent. hidden behind ActorRef

Actor paths

Actors live under the akka://ActorSystem protocol, akka:// meaning a local context

For remote actors, akka.tcp://[email protected]:6565/user/someActor is a TCP connection to an actor. Each actor has at least one URL

Actor paths cannot be used to watch an actor's lifecycle: we need the ActorRef ID for that

Actor paths can be absolute or relative. Broadcasting to actors is supported using wildcards like *

Akka cluster

Information in the cluster is spread using a gossip protocol. A node is part of a cluster once all members know about the new node

This will start a single-node cluster:

class ClusterMain extends Actor {
  val cluster = Cluster(context.system)
  cluster.subscribe(self, classOf[ClusterEvent.MemberUp])
  cluster.join(cluster.selfAddress)

  def receive = {
    case ClusterEvent.MemberUp(member) =>
      if (member.address != cluster.selfAddress) { someone joined }
  }
}

Worker nodes:

class ClusterWorker extends Actor {
  val cluster = Cluster(context.system)
  cluster.subscribe(self, classOf[ClusterEvent.MemberRemoved])
  val main = cluster.selfAddress.copy(port = Some("main's port"))
  cluster.join(main)

  def receive = {
    case ClusterEvent.MemberRemoved(m, _) => if (m.address == main) context.stop(self)
  }
}

https://doc.akka.io/docs/akka/current/typed/cluster.html