Flow.asFlowWithContext

Extracts context data from the elements of a Flow so that it can be turned into a FlowWithContext which can propagate that context per element along a stream.

Simple operators

Signature

Flow.asFlowWithContextFlow.asFlowWithContext

Description

See Context Propagation for a general overview of context propagation.

Extracts context data from the elements of a FlowFlow so that it can be turned into a FlowWithContextFlowWithContext which can propagate that context per element along a stream. The first function passed into asFlowWithContext must turn each incoming pair of element and context value into an element of this FlowFlow. The second function passed into asFlowWithContext must turn each outgoing element of this FlowFlow into an outgoing context value.

See also:

Example

Elements from this flow have a correlation number, but the flow structure should focus on the text message in the elements. The first converter in asFlowWithContext applies to the end of the “with context” flow to turn it into a regular flow again. The second converter function chooses the second value in the tuplepair as the context. Another map operator makes the first value the stream elements in the FlowWithContext.

Scala
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.FlowWithContext
// a regular flow with pairs as elements
val flow: Flow[(String, Int), (String, Int), NotUsed] = // ???

// Declare the "flow with context"
// ingoing: String and Integer
// outgoing: String and Integer
val flowWithContext: FlowWithContext[String, Int, String, Int, NotUsed] =
  // convert the flow of pairs into a "flow with context"
  flow
    .asFlowWithContext[String, Int, Int](
      // at the end of this flow: put the elements and the context back into a tuple
      collapseContext = Tuple2.apply)(
      // pick the second element of the incoming pair as context
      extractContext = _._2)
    .map(_._1) // keep the first pair element as stream element

val mapped = flowWithContext
  // regular operators apply to the element without seeing the context
  .map(_.reverse)

// running the flow with some sample data and asserting the outcome
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.Sink
import scala.collection.immutable

val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3)
val source = Source(values).asSourceWithContext(_._2).map(_._1)

val result = source.via(mapped).runWith(Sink.seq)
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
Java
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.javadsl.*;

// a regular flow with pairs as elements
Flow<Pair<String, Integer>, Pair<String, Integer>, NotUsed> flow = // ...

// Declare the "flow with context"
// ingoing: String and Integer
// outgoing: String and Integer
FlowWithContext<String, Integer, String, Integer, NotUsed> flowWithContext =
    // convert the flow of pairs into a "flow with context"
    flow.<String, Integer, Integer>asFlowWithContext(
            // at the end of this flow: put the elements and the context back into a pair
            Pair::create,
            // pick the second element of the incoming pair as context
            Pair::second)
        // keep the first pair element as stream element
        .map(Pair::first);

FlowWithContext<String, Integer, String, Integer, NotUsed> mapped =
    flowWithContext
        // regular operators apply to the element without seeing the context
        .map(s -> s.replace('e', 'y'));

// running the flow with some sample data and asserting the outcome
Collection<Pair<String, Integer>> values =
    Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3));

SourceWithContext<String, Integer, NotUsed> source =
    Source.from(values).asSourceWithContext(Pair::second).map(Pair::first);

CompletionStage<List<Pair<String, Integer>>> result =
    source.via(mapped).runWith(Sink.seq(), system);
List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertThat(
    list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));