Programming Reactive Systems (course notes)
Reactive manifesto
- Event-driven => react to events
- Scalable => react to load
- Resilient => react to failures
- Responsive => react to users
Being event-driven enables being scalable and resilient
Event-driven
Traditionally, systems are composed of multiple threads, which communicate and have shared state (sometimes synchronized)
Now, systems are composed from loosely coupled event handlers
Events can be handled asynchronously, without blocking => resource efficient
Scalable
- Scale up: parallelism in multi-core systems
- Scale out: multiple server nodes
Goals: minimize shared mutable state, location transparency, resilience
Resilient
Failures can be:
- Software failures
- Hardware failures
- Connection failures
Goals: loose coupling, encapsulation of state, supervisor hierarchies
Responsive
Real-time interaction with users. Being responsive is the result of applying the other three properties
Goals: backpressure, algorithms, system design
Traditional event handling
Observer and callbacks
class Counter implements ActionListener {
private var count = 0
button.addActionListener(this) // register to event source. when there is a new click, the actionPerformed callback is called
def actionPerformed(e: ActionEvent): Unit = {
count += 1
}
}
Problems:
actionPerformedhas a side effect => shared mutable state- Cannot be composed
- Callback hell
Solution: first class events, events as messages, event composing
Async execution (CPS: continuation passing style)
Compute on another computing unit, without explicitly waiting for its termination
A sync type signature can be turned async by:
- Returning
Unit - Taking a continuation parameter (callback) defining what to do after the computation
def syncProgram(a: A): B
def asyncProgram(a: A, k: B => Unit): Unit // k is a callback
To model failure, we can model the callback as k: Try[B] => Unit
Futures
def asyncProgram(a: A, k: B => Unit): Unit
// let's turn this into something cooler...
// first, let's curry k
def asyncProgram(a: A): (B => Unit) => Unit
// now, let's propose Future:
type Future[+T] = (T => Unit) => Unit
def futureProgram(a: A): Future[B]
// now, let's add failure handling
type Future[+T] = (Try[T] => Unit) => Unit
// now reify Future into a trait
trait Future[+T] extends ((Try[T] => Unit) => Unit) {
def onComplete(k: Try[T] => Unit): Unit // onComplete is renamed from apply, because applying a Future is passing the callback
}
Notice: the only difference between using a callback and Future is that we can operate upon the returning value of the Future using onComplete, map, etc
A future is a currified callback function that returns itself
Operations on Future
Future suffers from the composability issues as callbacks, but it can transform using map, flatMap, zip, recover, etc
maptransforms aFuture[A]into aFuture[B]flatMaptransformsFuture[A]intoFuture[B]applyingfafterFuture[A]has completed sequentiallyzipjoins two Futures by zipping their results into aFuture[(A, B)]
Order of operations
zip does not impose sequentiality: when zipping two Futures, both are fired concurrently and joined when both finish
But if we flatMap two futures, we have to wait sequentially unless we fire the Futures first:
val eventuallyCoffee1 = makeCoffee()
val eventuallyCoffee2 = makeCoffee()
// both futures fired
eventuallyCoffee1.flatMap( c1 => eventuallyCoffee2.map(c2 => (c1, c2))
// ^^ concurrent because Future creation implies its execution
The creation of a Future implies its execution (much like an effectful Promise)
Syntax sugar: for comprehensions
for {
work1 <- work
_ <- takeABreak()
work2 <- work()
} yield work1 + work2
This for comprehension is equal to flatmapping the first result and then joining it with the second, sequentially
Handling failures in Future
recovertakes a partial function overThrowablerecoverWithis an asyncrecover
Execution context
When applying a Future or using callbacks, we must use some thread to perform the task
- If we only use a thread, there is no parallelism
- We can use thread pools
Futuretakes an implicitExecutionContextfor each task
Introduction to the Actor model
Motivation:
- CPUs with more cores, same frequency
- Virtual cores sharing physical execution core
- Multi-tasking: running multiple programs in parallel
- Multi-threading: running parts of 1 program in parallel
Traditional solution: synchronize access to shared state (mutex). Possibility of deadlock, blocking
Actors represent non-blocking objects and their interactions. Formally, an actor is an object with identity, with behavior and async message passing
type Receive = PartialFunction[Any, Unit]
trait Actor {
implicit val self: ActorRef
def sender: ActorRef
}
abstract class ActorRef {
def !(msg:Any)(implicit sender: ActorRef): Unit
}
Actors in Akka are modeled as ActorRefs. To execute actors, we need an ActorContext. ActorContext can create, stop, become or unbecome (modify) actor behavior. Actors are created by other actors
- State change is explicit
- State is scoped to current behavior
Important concepts on Actors
- Actors can only send messages to known actors (including self)
- Actors cannot access other actors' behavior - they can only send messages
- No shared mutable state, no global synchronization
- Messages are fire-and-forget, meaning communication is one-way. To communicate completion status (e.g. ACK), send the sender another message
- Messages are received sequentially
- Processing a message is the atomic unit of execution!!! VERY CONVENIENT FOR TX AND THREADING
- No blocking, but an Actor's mailbox is like a queue
- Messages are like letters, unreliable. Order in the same sender is guaranteed
Delivery of messages
Delivery of a message requires eventual availability of channel and recipient. Efforts:
- At-most-once (send once). Keeps no state
- At-least-once (send until ACK). Resends
- Exactly-once. Receiver keeps track of which messages have been processed
Regardless of effort, how can we make the communication more reliable?
- Log activities to persistent storage
- Each activity has unique ID
- Store IDs of completed activities within actors
Writing Actor systems
Example: recursive downloader. Given an URL, download the content, extract the links and follow them, bounded by a maximum depth
Plan of action: write a Receptionist (maps request to a Controller), a Controller (spawns Getters for all links), and Getters (actors who process the body using an HTTP client)
Failure in Actor systems
Where do failures go?
- Reify failure as a message
- Send to a known error address
Solution: individual failure is handled by a leader (supervisor actor)
- Failed actor is terminated or restarted
- Supervised actors form a tree structure
- Supervisors create their subordinates, and take decisions
class Manager extends Actor {
override val supervisorStrategy = OneForOneStrategy() {
case _: DBException => Restart
case _: ActorKilledException => Stop
case _: ServiceDownException => Escalate
}
}
Supervising strategies:
- One for one (parent supervises each actor individually)
- All for one (parent supervises a group of child actors, meaning messages from parent apply to all children)
- Stopping: for any failure of a child, parent will stop it
Lifecycle
- Hooks: preStart, postStart, preRestart, postRestart
DeathWatch: watchers register as listeners of other actors. When those actors die (stop), registrated actors receive aTerminatedmessage. UnhandledTerminatedmessages are trated asDeathPact- Actors know their parent using
context.parentand their children usingcontext.children
The error kernel
- Restarts are recursive
- Restarts are more frequent near leaves
- Avoid restarting stateful important actors. Important state => higher up the hierarchy, riskier operation => lower down the hierarchy
Event stream
Actors can direct messages ONLY to known addresses.
EventStream allows broadcasting (publication of messages to unknown audience)
Any actor can subscribe to EventStream, which can emit messages of certain topics
Actors know the stream using context.system.eventStream
Persisting actor state
Persisting current state
Snapshot, latest state in constant time. No previous history
Persisting changes
History can be replayed, audited, restored. Retroactive handling of errors. Better IO bandwidth, immutable changes can be replaced
Command sourcing: persist commands before processing, persist ACK upon processing. During recovery, all commands are replayed with effects scoped locally
Event sourcing: generate change requests (events). During a replay, the actor receives the events
sealed trait Event
case class PostCreated(text: String) extends Event
case object QuotaReached extends Event
case class State(posts: Vector[String], disabled: Boolean) {
def updated(e: Event): State = e match {
case PostCreated(text) = copy(posts = posts :+ text)
case QuotaReached => copy(disabled = true)
}
}
Distributed actors
Network, inter-process communication
- Data sharing by value (no reference)
- Lower bandwidth
- Higher latency
- Partial failure (network not reliable)
- Data corruption
Actor communication is async, one-way. Actors are location transparent. hidden behind ActorRef
Actor paths
Actors live under the akka://ActorSystem protocol, akka:// meaning a local context
For remote actors, akka.tcp://[email protected]:6565/user/someActor is a TCP connection to an actor. Each actor has at least one URL
Actor paths cannot be used to watch an actor's lifecycle: we need the ActorRef ID for that
Actor paths can be absolute or relative. Broadcasting to actors is supported using wildcards like *
Akka cluster
Information in the cluster is spread using a gossip protocol. A node is part of a cluster once all members know about the new node
This will start a single-node cluster:
class ClusterMain extends Actor {
val cluster = Cluster(context.system)
cluster.subscribe(self, classOf[ClusterEvent.MemberUp])
cluster.join(cluster.selfAddress)
def receive = {
case ClusterEvent.MemberUp(member) =>
if (member.address != cluster.selfAddress) { someone joined }
}
}
Worker nodes:
class ClusterWorker extends Actor {
val cluster = Cluster(context.system)
cluster.subscribe(self, classOf[ClusterEvent.MemberRemoved])
val main = cluster.selfAddress.copy(port = Some("main's port"))
cluster.join(main)
def receive = {
case ClusterEvent.MemberRemoved(m, _) => if (m.address == main) context.stop(self)
}
}