mergePrioritized
Merge multiple sources.
Signature
Source.mergePrioritized
Source.mergePrioritized
Flow.mergePrioritized
Flow.mergePrioritized
Description
Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used with only two sources, the left source has a probability of (leftPriority) / (leftPriority + rightPriority)
of being prioritized and similarly for the right source. The priorities for each source must be positive integers.
Example
- Scala
-
source
import org.apache.pekko.stream.scaladsl.{ Sink, Source } val sourceA = Source(List(1, 2, 3, 4)) val sourceB = Source(List(10, 20, 30, 40)) sourceA.mergePrioritized(sourceB, 99, 1).runWith(Sink.foreach(println)) // prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and the left source // has higher priority – if both sources have elements ready, sourceA has a 99% chance of being picked next // while sourceB has a 1% chance
- Java
-
source
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.mergePrioritized(sourceB, 99, 1, false).runForeach(System.out::println, system); // prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and // the left source has higher priority – if both sources have elements ready, sourceA has a // 99% chance of being picked next while sourceB has a 1% chance
Reactive Streams semantics
emits when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available
backpressures when downstream backpressures
completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true
.)