mergePreferred
Merge multiple sources.
Signature
Source.mergePreferred
Source.mergePreferred
Flow.mergePreferred
Flow.mergePreferred
Description
Merge multiple sources. If all sources have elements ready, emit the preferred source first. Then emit the preferred source again if another element is pushed. Otherwise, emit all the secondary sources. Repeat until streams are empty. For the case with two sources, when preferred
is set to true then prefer the right source, otherwise prefer the left source (see examples).
Example
- Scala
-
source
import org.apache.pekko.stream.scaladsl.{ Sink, Source } val sourceA = Source(List(1, 2, 3, 4)) val sourceB = Source(List(10, 20, 30, 40)) sourceA.mergePreferred(sourceB, false).runWith(Sink.foreach(println)) // prints 1, 10, ... since both sources have their first element ready and the left source is preferred sourceA.mergePreferred(sourceB, true).runWith(Sink.foreach(println)) // prints 10, 1, ... since both sources have their first element ready and the right source is preferred
- Java
-
source
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.mergePreferred(sourceB, false, false).runForeach(System.out::println, system); // prints 1, 10, ... since both sources have their first element ready and the left source is // preferred sourceA.mergePreferred(sourceB, true, false).runForeach(System.out::println, system); // prints 10, 1, ... since both sources have their first element ready and the right source is // preferred
Reactive Streams semantics
emits when one of the inputs has an element available, preferring a defined input if multiple have elements available
backpressures when downstream backpressures
completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true
.)