Durable State Stateful Flow

Durable state stateful flow are defined directly in terms of their state type.

They are well suited to model CRUD entity where the frequency of state changes is relatively low.

Logic

A StatefulFlowLogic.DurableState[State, In, Out, Command] is a kind of logic where inputs/commands acts directly upon the state generating an updated version of the state object.

The output type of the flow is not fixed, this means that a flow using a durable state logic can freely decide what to produce as output depending on the current state and input.

In order to adapt the counter example to use a durable state stateful flow we need to start by defining the state and command models:

source/** The state model */
case class CounterState(total: Int)

/** The command used to query the flow for its current counter value */
case class GetCounter(replyTo: ActorRef[StatusReply[Int]])

Now that we have our models we can instantiate a logic operating on it:

sourceimport StatefulFlowLogic._
val logic = DurableState[CounterState, CounterSample, Int, GetCounter](
  () => CounterState(0),
  (state, sample) => {
    val newCounter = state.total + sample.entrances
    println(
      s"deployment:${sample.deploymentId} entrance:${sample.entranceId} - " +
        s"timestamp:${sample.timestamp} counter:${newCounter}"
    )
    DurableState
      .ProcessingResult(CounterState(newCounter))
      .withOutput(newCounter)
  },
  (state, command) => {
    command.replyTo ! StatusReply.success(state.total)
    DurableState.ProcessingResult(state)
  }
)

We do that by invoking StatefulFlowLogic.DurableState specifying the following 3 parameters:

  1. initial state: the state value to use for the first instantiation of the flow (i.e. when no state can be recovered by the backend)
  2. input handler: the function used to handle stream inputs by generating the new state (possibly inspecting the current state)
  3. command handler: the function used to handle commands by generating the new state (possibly inspecting the current state)

The input handler function expects a result of type StatefulFlowLogic.DurableState.ProcessingResult[State, Out]. This types serves as a representation of:

  1. The updated state generated by the input
  2. The output the flow needs to produce
  3. The side effect we want to be performed before the persisted state is updated
  4. The side effect we want to be performed after the persisted state has been successfully updated

The command handler is similar to the input one apart for the fact that its result has an output type set to Nothing. This reflect the fact that since commands do not correspond to any particular input, in order to maintain the one-to-one property of Spekka flows, we cannot produce any output while handling them.

Thanks to the native side effect support it becomes easy to write pipelines of micro-services with at-least-once semantic (use Kafka as a backbone, read from topicUpstream and emit to topicDownstream inside a before side effect. If the process fails while emitting, we have a guarantee that upon restart the stream will resume processing where it left off, hence repeating the side effects.)

Similarly you can use after side effects to model at-most-once scenarios (if the system fails after the state has been modified, there is no guarantee that the logic will produce the same side effects upon restart).

***note Both before and after side effects are evaluated as part of the stream. This means that the flow will not produce any output until both side effects groups have been completed successfully. Similarly if any side effect fails (i.e. Future.failed) the stream will be failed as well. ***

Backend

A StatefulFlowBackend.DurableState[State, _] is a kind of backend compatible with durable stated logics with the same state type.

Spekka Stateful ships with an in-memory implementation useful for testing and quick prototyping: InMemoryStatefulFlowBackend.DurableState[State].

We can create a backend for our example with the following code:

sourceval backend = InMemoryStatefulFlowBackend.DurableState[CounterState]()

Props

Once we have define both the logic and the backend, we can obtain a StatefulFlowProps by invoking the propsForBackend method on the logic instance:

sourceval flowProps = logic.propsForBackend(backend)

Full example

You can find the full example here: StatefulFlowDurableStateExample.scala.