Flow.asFlowWithContext
Extracts context data from the elements of a Flow
so that it can be turned into a FlowWithContext
which can propagate that context per element along a stream.
Signature¶
Description¶
See Context Propagation for a general overview of context propagation.
Extracts context data from the elements of a Flow
so that it can be turned into a FlowWithContext
which can propagate that context per element along a stream. The first function passed into asFlowWithContext
must turn each incoming pair of element and context value into an element of this Flow
. The second function passed into asFlowWithContext
must turn each outgoing element of this Flow
into an outgoing context value.
See also:
- Context Propagation
Source.asSourceWithContext
Turns aSource
into aSourceWithContext
which can propagate a context per element along a stream.
Example¶
Elements from this flow have a correlation number, but the flow structure should focus on the text message in the elements. The first converter in asFlowWithContext
applies to the end of the “with context” flow to turn it into a regular flow again. The second converter function chooses the second value in the tuple as the context. Another map
operator makes the first value the stream elements in the FlowWithContext
.
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.FlowWithContext
// a regular flow with pairs as elements
val flow: Flow[(String, Int), (String, Int), NotUsed] = // ???
// Declare the "flow with context"
// ingoing: String and Integer
// outgoing: String and Integer
val flowWithContext: FlowWithContext[String, Int, String, Int, NotUsed] =
// convert the flow of pairs into a "flow with context"
flow
.asFlowWithContext[String, Int, Int](
// at the end of this flow: put the elements and the context back into a tuple
collapseContext = Tuple2.apply)(
// pick the second element of the incoming pair as context
extractContext = _._2)
.map(_._1) // keep the first pair element as stream element
val mapped = flowWithContext
// regular operators apply to the element without seeing the context
.map(_.reverse)
// running the flow with some sample data and asserting the outcome
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.Sink
import scala.collection.immutable
val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3)
val source = Source(values).asSourceWithContext(_._2).map(_._1)
val result = source.via(mapped).runWith(Sink.seq)
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.javadsl.*;
// a regular flow with pairs as elements
Flow<Pair<String, Integer>, Pair<String, Integer>, NotUsed> flow = // ...
// Declare the "flow with context"
// ingoing: String and Integer
// outgoing: String and Integer
FlowWithContext<String, Integer, String, Integer, NotUsed> flowWithContext =
// convert the flow of pairs into a "flow with context"
flow.<String, Integer, Integer>asFlowWithContext(
// at the end of this flow: put the elements and the context back into a pair
Pair::create,
// pick the second element of the incoming pair as context
Pair::second)
// keep the first pair element as stream element
.map(Pair::first);
FlowWithContext<String, Integer, String, Integer, NotUsed> mapped =
flowWithContext
// regular operators apply to the element without seeing the context
.map(s -> s.replace('e', 'y'));
// running the flow with some sample data and asserting the outcome
Collection<Pair<String, Integer>> values =
Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3));
SourceWithContext<String, Integer, NotUsed> source =
Source.from(values).asSourceWithContext(Pair::second).map(Pair::first);
CompletionStage<List<Pair<String, Integer>>> result =
source.via(mapped).runWith(Sink.seq(), system);
List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertThat(
list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));