Source.asSourceWithContext
Extracts context data from the elements of a Source
so that it can be turned into a SourceWithContext
which can propagate that context per element along a stream.
Signature¶
Description¶
See Context Propagation for a general overview of context propagation.
Extracts context data from the elements of a Source
so that it can be turned into a SourceWithContext
which can propagate that context per element along a stream. The function passed into asSourceWithContext
must turn elements into contexts, one context for every element.
See also:
- Context Propagation
Flow.asFlowWithContext
Turns aFlow
into aFlowWithContext
which can propagate a context per element along a stream.
Example¶
Elements from this source have a correlation number, but the flow structure should focus on the text message in the elements. asSourceWithContext
chooses the second value in the tuple as the context. Another map
operator makes the first value the stream elements in the SourceWithContext
.
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.SourceWithContext
import scala.collection.immutable
// values with their contexts as tuples
val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3)
// a regular source with the tuples as elements
val source: Source[(String, Int), NotUsed] = Source(values)
// split the tuple into stream elements and their context
val sourceWithContext: SourceWithContext[String, Int, NotUsed] =
source
.asSourceWithContext(_._2) // pick the second tuple element as context
.map(_._1) // keep the first tuple element as stream element
val mapped: SourceWithContext[String, Int, NotUsed] = sourceWithContext
// regular operators apply to the element without seeing the context
.map(s => s.reverse)
// running the source and asserting the outcome
import org.apache.pekko.stream.scaladsl.Sink
val result = mapped.runWith(Sink.seq)
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.javadsl.*;
// values with their contexts as pairs
Collection<Pair<String, Integer>> values =
Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3));
// a regular source with pairs as elements
Source<Pair<String, Integer>, NotUsed> source = Source.from(values);
// split the pair into stream elements and their context
SourceWithContext<String, Integer, NotUsed> sourceWithContext =
source
.asSourceWithContext(Pair::second) // pick the second pair element as context
.map(Pair::first); // keep the first pair element as stream element
SourceWithContext<String, Integer, NotUsed> mapped =
sourceWithContext
// regular operators apply to the element without seeing the context
.map(s -> s.replace('e', 'y'));
// running the source and asserting the outcome
CompletionStage<List<Pair<String, Integer>>> result = mapped.runWith(Sink.seq(), system);
List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertThat(
list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));