merge

Merge multiple sources.

Fan-in operators

Signature

Source.mergeSource.merge Flow.mergeFlow.merge

Description

Merge multiple sources. Picks elements randomly if all sources has elements ready.

Example

Scala
sourceimport org.apache.pekko.stream.scaladsl.{ Sink, Source }

val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))

sourceA.merge(sourceB).runWith(Sink.foreach(println))
// merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40
Java
sourceimport org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.Sink;

import java.util.*;

Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
sourceA.merge(sourceB).runForeach(System.out::println, system);
// merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40

Reactive Streams semantics

emits when one of the inputs has an element available

backpressures when downstream backpressures

completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true.)