Stateful Flow Usage
Once we have generated StatefulFlowProp
by combining a logic and a backend we are ready to register the flow for usage.
In order to do so we first need to create a StatefulFlowRegistry
which will handle all the low level tasks needed for coordinating the stateful flows.
A registry can be created invoking StatefulFlowRegistry.apply
specifying a timeout to use for communication with the registry itself and possibly a name for the registry (only useful if more than one registry instance is needed, which is not recommended).
sourceval registry = StatefulFlowRegistry(30.seconds)
We can now register the flows (see the section on either Event Based or Durable State to see how the flow was defined):
sourceval byDeploymentFlowBuilder = registry.registerStatefulFlowSync("byDeployment", flowProps)
val byEntranceFlowBuilder = registry.registerStatefulFlowSync("byEntrance", flowProps)
At this point the StatefulFlowBuilder
can be used to instantiate flows by specifying a particular entity:
sourceimport PartitionTree._
val totalByEntranceFlow = Partition
.treeBuilder[CounterSample, Offset]
.dynamicAuto(_.deploymentId)
.dynamicAuto(_.entranceId)
.build { case (entrance: EntranceId) :@: (deployment: DeploymentId) :@: KNil =>
byEntranceFlowBuilder
.flowWithExtendedContext(s"${deployment.id}:${entrance.id}")
.via(printingFlow(s"deployment:${deployment.id} entrance:${entrance.id}"))
}
val totalByDeploymentFlow = Partition
.treeBuilder[CounterSample, Offset]
.dynamicAuto(_.deploymentId)
.build { case deployment :@: KNil =>
byDeploymentFlowBuilder
.flowWithExtendedContext(s"${deployment.id}")
.via(printingFlow(s"deployment:${deployment.id} total"))
}
Once a stateful stream is running we can send command to it by using its StatefulFlowControl
object (obtained either by accessing the flow materialized value or via the StatefulFlowBuilder.control
method):
sourcefor {
byDeploymentC <- byDeploymentFlowBuilder.control("a")
byEntranceC <- byEntranceFlowBuilder.control("a:1")
aTotalF = byDeploymentC.get.commandWithResult(GetCounterCommand)
aE1TotalF = byEntranceC.get.commandWithResult(GetCounterCommand)
aTotal <- aTotalF
aE1Total <- aE1TotalF
_ = println(s"*** deployment a total ${aTotal}; deployment a entrance 1 total ${aE1Total}")
} yield ()