mergePrioritized

Merge multiple sources.

Fan-in operators

Signature

Source.mergePrioritizedSource.mergePrioritized Flow.mergePrioritizedFlow.mergePrioritized

Description

Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used with only two sources, the left source has a probability of (leftPriority) / (leftPriority + rightPriority) of being prioritized and similarly for the right source. The priorities for each source must be positive integers.

Example

Scala
sourceimport org.apache.pekko.stream.scaladsl.{ Sink, Source }

val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))

sourceA.mergePrioritized(sourceB, 99, 1).runWith(Sink.foreach(println))
// prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and the left source
// has higher priority – if both sources have elements ready, sourceA has a 99% chance of being picked next
// while sourceB has a 1% chance
Java
sourceSource<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));

sourceA.mergePrioritized(sourceB, 99, 1, false).runForeach(System.out::println, system);
// prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and
// the left source has higher priority – if both sources have elements ready, sourceA has a
// 99% chance of being picked next while sourceB has a 1% chance

Reactive Streams semantics

emits when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available

backpressures when downstream backpressures

completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true.)