mergePrioritized
Merge multiple sources.
Signature¶
Source.mergePrioritized
Flow.mergePrioritized
Description¶
Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used with only two sources, the left source has a probability of (leftPriority) / (leftPriority + rightPriority)
of being prioritized and similarly for the right source. The priorities for each source must be positive integers.
Example¶
sourceimport org.apache.pekko.stream.scaladsl.{ Sink, Source }
val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))
sourceA.mergePrioritized(sourceB, 99, 1).runWith(Sink.foreach(println))
// prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and the left source
// has higher priority – if both sources have elements ready, sourceA has a 99% chance of being picked next
// while sourceB has a 1% chance
sourceSource<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
sourceA.mergePrioritized(sourceB, 99, 1, false).runForeach(System.out::println, system);
// prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and
// the left source has higher priority – if both sources have elements ready, sourceA has a
// 99% chance of being picked next while sourceB has a 1% chance
Reactive Streams semantics¶
emits when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available
backpressures when downstream backpressures
completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true
.)