Stateful Flow Usage

Once we have generated StatefulFlowProp by combining a logic and a backend we are ready to register the flow for usage.

In order to do so we first need to create a StatefulFlowRegistry which will handle all the low level tasks needed for coordinating the stateful flows.

A registry can be created invoking StatefulFlowRegistry.apply specifying a timeout to use for communication with the registry itself and possibly a name for the registry (only useful if more than one registry instance is needed, which is not recommended).

sourceval registry = StatefulFlowRegistry(30.seconds)

We can now register the flows (see the section on either Event Based or Durable State to see how the flow was defined):

sourceval byDeploymentFlowBuilder = registry.registerStatefulFlowSync("byDeployment", flowProps)
val byEntranceFlowBuilder = registry.registerStatefulFlowSync("byEntrance", flowProps)

At this point the StatefulFlowBuilder can be used to instantiate flows by specifying a particular entity:

sourceimport PartitionTree._
val totalByEntranceFlow = Partition
  .treeBuilder[CounterSample, Offset]
  .dynamicAuto(_.deploymentId)
  .dynamicAuto(_.entranceId)
  .build { case (entrance: EntranceId) :@: (deployment: DeploymentId) :@: KNil =>
    byEntranceFlowBuilder
      .flowWithExtendedContext(s"${deployment.id}:${entrance.id}")
      .via(printingFlow(s"deployment:${deployment.id} entrance:${entrance.id}"))
  }

val totalByDeploymentFlow = Partition
  .treeBuilder[CounterSample, Offset]
  .dynamicAuto(_.deploymentId)
  .build { case deployment :@: KNil =>
    byDeploymentFlowBuilder
      .flowWithExtendedContext(s"${deployment.id}")
      .via(printingFlow(s"deployment:${deployment.id} total"))
  }

Once a stateful stream is running we can send command to it by using its StatefulFlowControl object (obtained either by accessing the flow materialized value or via the StatefulFlowBuilder.control method):

sourcefor {
  byDeploymentC <- byDeploymentFlowBuilder.control("a")
  byEntranceC <- byEntranceFlowBuilder.control("a:1")
  aTotalF = byDeploymentC.get.commandWithResult(GetCounterCommand)
  aE1TotalF = byEntranceC.get.commandWithResult(GetCounterCommand)
  aTotal <- aTotalF
  aE1Total <- aE1TotalF
  _ = println(s"*** deployment a total ${aTotal}; deployment a entrance 1 total ${aE1Total}")
} yield ()