Flow With Extended Context
A FlowWithExtendedContext[In, Out, Ctx, M]
is much like an Akka’s FlowWithContext
where the context type Ctx
is wrapped in an ExtendedContext
. The allowed operations on a FlowWithExtendedContext
are limited in order to avoid:
- filtering operation (like
filter
,filterNot
,collect
) - one-to-n (like
mapConcat
)
It comes a point were such restrictions may make it impossible to implement a particular feature. For this reason there is the possibility to convert a FlowWithExtendedContext
to a regular flow (using toGraph
) and back (using FlowWithExtendedContext.fromGraphUnsafe
).
This method has an intentionally scary name as, when using it, the programmer is responsible of making sure that the FlowWithExtendedContext
guarantees have been respected. Failure to do so may result in stream deadlock and/or errors at run-time.
Creating
A FlowWithExtendedContext
can be created similarly to Akka Flow
and FlowWithContext
.
For instance, considering the following definition modeling a people counting system:
source/** Fake offset context type (i.e. equivalent of a Kakfa offset)
*/
type Offset = Long
/** The physical deployment where the sensors are deployed
*/
case class DeploymentId(id: String)
/** The id of the entrance where the sensor is located
*/
case class EntranceId(id: Int)
/** A sample produced by the People Counter sensor network detailing the number of entrances that
* occurred since the last sample was sent
*/
case class CounterSample(
deploymentId: DeploymentId,
entranceId: EntranceId,
timestamp: Long,
entrances: Int
)
we can create a simple flow which computes the total number of people which entered a store with the following code:
sourceval entrancesSumFlow =
FlowWithExtendedContext[CounterSample, Offset].statefulMap { () =>
var total: Int = 0
counter =>
total = total + counter.entrances
counter.timestamp -> total
}
Connecting
Once a FlowWithExtendedContext
instance has been created, it can be connected to other flows using the standard Akka’s via
and viaMat
operators:
sourceval done: Future[Done] = sampleSource
.via(entrancesSumFlow)
.via(printingFlow("total"))
.runWith(offsetCommittingSink)
What we achieved in this example is a stream which sums all the samples it receives as input (working under the assumption that they are coming from a single deployment) and commits the message offsets as soon as the processing is completed.
You can find the full example here: FlowWithExtendedContextBasicExample.scala.