mergeSorted

Merge multiple sources.

Fan-in operators

Signature

Source.mergeSortedSource.mergeSorted Flow.mergeSortedFlow.mergeSorted

Description

Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element.

Example

Scala
sourceimport org.apache.pekko.stream.scaladsl.{ Sink, Source }

val sourceA = Source(List(1, 3, 5, 7))
val sourceB = Source(List(2, 4, 6, 8))

sourceA.mergeSorted(sourceB).runWith(Sink.foreach(println))
// prints 1, 2, 3, 4, 5, 6, 7, 8

val sourceC = Source(List(20, 1, 1, 1))

sourceA.mergeSorted(sourceC).runWith(Sink.foreach(println))
// prints 1, 3, 5, 7, 20, 1, 1, 1
Java
sourceimport org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.Sink;

import java.util.*;

Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8));
sourceA
    .mergeSorted(sourceB, Comparator.<Integer>naturalOrder())
    .runForeach(System.out::println, system);
// prints 1, 2, 3, 4, 5, 6, 7, 8

Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(20, 1, 1, 1));
sourceA
    .mergeSorted(sourceC, Comparator.<Integer>naturalOrder())
    .runForeach(System.out::println, system);
// prints 1, 3, 5, 7, 20, 1, 1, 1

Reactive Streams semantics

emits when all of the inputs have an element available

backpressures when downstream backpressures

completes when all upstreams complete