Flow With Extended Context

A FlowWithExtendedContext[In, Out, Ctx, M] is much like an Akka’s FlowWithContext where the context type Ctx is wrapped in an ExtendedContext. The allowed operations on a FlowWithExtendedContext are limited in order to avoid:

  • filtering operation (like filter, filterNot, collect)
  • one-to-n (like mapConcat)
Warning

It comes a point were such restrictions may make it impossible to implement a particular feature. For this reason there is the possibility to convert a FlowWithExtendedContext to a regular flow (using toGraph) and back (using FlowWithExtendedContext.fromGraphUnsafe).

This method has an intentionally scary name as, when using it, the programmer is responsible of making sure that the FlowWithExtendedContext guarantees have been respected. Failure to do so may result in stream deadlock and/or errors at run-time.

Creating

A FlowWithExtendedContext can be created similarly to Akka Flow and FlowWithContext.

For instance, considering the following definition modeling a people counting system:

source/** Fake offset context type (i.e. equivalent of a Kakfa offset)
  */
type Offset = Long

/** The physical deployment where the sensors are deployed
  */
case class DeploymentId(id: String)

/** The id of the entrance where the sensor is located
  */
case class EntranceId(id: Int)

/** A sample produced by the People Counter sensor network detailing the number of entrances that
  * occurred since the last sample was sent
  */
case class CounterSample(
    deploymentId: DeploymentId,
    entranceId: EntranceId,
    timestamp: Long,
    entrances: Int
  )

we can create a simple flow which computes the total number of people which entered a store with the following code:

sourceval entrancesSumFlow =
  FlowWithExtendedContext[CounterSample, Offset].statefulMap { () =>
    var total: Int = 0

    counter =>
      total = total + counter.entrances
      counter.timestamp -> total
  }

Connecting

Once a FlowWithExtendedContext instance has been created, it can be connected to other flows using the standard Akka’s via and viaMat operators:

sourceval done: Future[Done] = sampleSource
  .via(entrancesSumFlow)
  .via(printingFlow("total"))
  .runWith(offsetCommittingSink)

What we achieved in this example is a stream which sums all the samples it receives as input (working under the assumption that they are coming from a single deployment) and commits the message offsets as soon as the processing is completed.

You can find the full example here: FlowWithExtendedContextBasicExample.scala.