mergePreferred

Merge multiple sources.

Fan-in operators

Signature

Source.mergePreferredSource.mergePreferred Flow.mergePreferredFlow.mergePreferred

Description

Merge multiple sources. If all sources have elements ready, emit the preferred source first. Then emit the preferred source again if another element is pushed. Otherwise, emit all the secondary sources. Repeat until streams are empty. For the case with two sources, when preferred is set to true then prefer the right source, otherwise prefer the left source (see examples).

Example

Scala
sourceimport org.apache.pekko.stream.scaladsl.{ Sink, Source }

val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))

sourceA.mergePreferred(sourceB, false).runWith(Sink.foreach(println))
// prints 1, 10, ... since both sources have their first element ready and the left source is preferred

sourceA.mergePreferred(sourceB, true).runWith(Sink.foreach(println))
// prints 10, 1, ... since both sources have their first element ready and the right source is preferred
Java
sourceSource<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));

sourceA.mergePreferred(sourceB, false, false).runForeach(System.out::println, system);
// prints 1, 10, ... since both sources have their first element ready and the left source is
// preferred

sourceA.mergePreferred(sourceB, true, false).runForeach(System.out::println, system);
// prints 10, 1, ... since both sources have their first element ready and the right source is
// preferred

Reactive Streams semantics

emits when one of the inputs has an element available, preferring a defined input if multiple have elements available

backpressures when downstream backpressures

completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true.)