merge
Merge multiple sources.
Signature
Source.merge
Source.merge
Flow.merge
Flow.merge
Description
Merge multiple sources. Picks elements randomly if all sources has elements ready.
Example
- Scala
-
source
import org.apache.pekko.stream.scaladsl.{ Sink, Source } val sourceA = Source(List(1, 2, 3, 4)) val sourceB = Source(List(10, 20, 30, 40)) sourceA.merge(sourceB).runWith(Sink.foreach(println)) // merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40
- Java
-
source
import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.javadsl.Sink; import java.util.*; Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.merge(sourceB).runForeach(System.out::println, system); // merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40
Reactive Streams semantics
emits when one of the inputs has an element available
backpressures when downstream backpressures
completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true
.)
1.0.3