mergePreferred
Merge multiple sources.
Signature¶
Source.mergePreferred
Flow.mergePreferred
Description¶
Merge multiple sources. If all sources have elements ready, emit the preferred source first. Then emit the preferred source again if another element is pushed. Otherwise, emit all the secondary sources. Repeat until streams are empty. For the case with two sources, when preferred
is set to true then prefer the right source, otherwise prefer the left source (see examples).
Example¶
sourceimport org.apache.pekko.stream.scaladsl.{ Sink, Source }
val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))
sourceA.mergePreferred(sourceB, false).runWith(Sink.foreach(println))
// prints 1, 10, ... since both sources have their first element ready and the left source is preferred
sourceA.mergePreferred(sourceB, true).runWith(Sink.foreach(println))
// prints 10, 1, ... since both sources have their first element ready and the right source is preferred
sourceSource<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
sourceA.mergePreferred(sourceB, false, false).runForeach(System.out::println, system);
// prints 1, 10, ... since both sources have their first element ready and the left source is
// preferred
sourceA.mergePreferred(sourceB, true, false).runForeach(System.out::println, system);
// prints 10, 1, ... since both sources have their first element ready and the right source is
// preferred
Reactive Streams semantics¶
emits when one of the inputs has an element available, preferring a defined input if multiple have elements available
backpressures when downstream backpressures
completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true
.)
1.1.3