Source.asSourceWithContext
Extracts context data from the elements of a Source
so that it can be turned into a SourceWithContext
which can propagate that context per element along a stream.
Signature
Source.asSourceWithContext
Source.asSourceWithContext
Description
See Context Propagation for a general overview of context propagation.
Extracts context data from the elements of a Source
Source
so that it can be turned into a SourceWithContext
SourceWithContext
which can propagate that context per element along a stream. The function passed into asSourceWithContext
must turn elements into contexts, one context for every element.
See also:
- Context Propagation
Flow.asFlowWithContext
Turns aFlow
into aFlowWithContext
which can propagate a context per element along a stream.
Example
Elements from this source have a correlation number, but the flow structure should focus on the text message in the elements. asSourceWithContext
chooses the second value in the tuplepair as the context. Another map
operator makes the first value the stream elements in the SourceWithContext
.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.stream.scaladsl.SourceWithContext import scala.collection.immutable // values with their contexts as tuples val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3) // a regular source with the tuples as elements val source: Source[(String, Int), NotUsed] = Source(values) // split the tuple into stream elements and their context val sourceWithContext: SourceWithContext[String, Int, NotUsed] = source .asSourceWithContext(_._2) // pick the second tuple element as context .map(_._1) // keep the first tuple element as stream element val mapped: SourceWithContext[String, Int, NotUsed] = sourceWithContext // regular operators apply to the element without seeing the context .map(s => s.reverse) // running the source and asserting the outcome import org.apache.pekko.stream.scaladsl.Sink val result = mapped.runWith(Sink.seq) result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
- Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.javadsl.*; // values with their contexts as pairs Collection<Pair<String, Integer>> values = Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3)); // a regular source with pairs as elements Source<Pair<String, Integer>, NotUsed> source = Source.from(values); // split the pair into stream elements and their context SourceWithContext<String, Integer, NotUsed> sourceWithContext = source .asSourceWithContext(Pair::second) // pick the second pair element as context .map(Pair::first); // keep the first pair element as stream element SourceWithContext<String, Integer, NotUsed> mapped = sourceWithContext // regular operators apply to the element without seeing the context .map(s -> s.replace('e', 'y')); // running the source and asserting the outcome CompletionStage<List<Pair<String, Integer>>> result = mapped.runWith(Sink.seq(), system); List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS); assertThat( list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));