Source.combine

Combine several sources, using a given strategy such as merge or concat, into one source.

Source operators

Signature

Source.combineSource.combine

Description

Provides a way to create a “fan-in” of multiple sources without having to use the more advanced GraphDSL.

The way the elements from the sources are combined is pluggable through the strategy parameter which accepts a function Int => Graph[FanInShape]Integer -> Graph<FanInShape> where the integer parameter specifies the number of sources that the graph must accept. This makes it possible to use combine with the built-in Concat and Merge by expanding their apply methods to functionsusing a method reference to their create methods, but also to use an arbitrary strategy.

Combine is most useful when you have more sources than 2 or want to use a custom operator, as there are more concise operators for 2-source concat and merge

Some of the built-in operators that can be used as strategy are:

Examples

In this example we Merge three different sources of integers. The three sources will immediately start contributing elements to the combined source. The individual elements from each source will be in order but the order compared to elements from other sources is not deterministic:

Scala
sourceimport org.apache.pekko.stream.scaladsl.{ Concat, Merge, Source }
// ...

val source1 = Source(1 to 3)
val source2 = Source(8 to 10)
val source3 = Source(12 to 14)
val combined = Source.combine(source1, source2, source3)(Merge(_))
combined.runForeach(println)
// could print (order between sources is not deterministic)
// 1
// 12
// 8
// 9
// 13
// 14
// 2
// 10
// 3
Java
sourceimport org.apache.pekko.stream.javadsl.Concat;
import org.apache.pekko.stream.javadsl.Merge;
import org.apache.pekko.stream.javadsl.Source;
// ...

Source<Integer, NotUsed> source1 = Source.range(1, 3);
Source<Integer, NotUsed> source2 = Source.range(8, 10);
Source<Integer, NotUsed> source3 = Source.range(12, 14);

final Source<Integer, NotUsed> combined =
    Source.combine(source1, source2, Collections.singletonList(source3), Merge::create);

combined.runForeach(System.out::println, system);
// could print (order between sources is not deterministic)
// 1
// 12
// 8
// 9
// 13
// 14
// 2
// 10
// 3

If we instead use Concat the first source will get to emit elements until it completes, then the second source until that completes and so on until all the sources has completed.

Scala
sourceval source1 = Source(1 to 3)
val source2 = Source(8 to 10)
val source3 = Source(12 to 14)
val sources = Source.combine(source1, source2, source3)(Concat(_))
sources.runForeach(println)
// prints (order is deterministic)
// 1
// 2
// 3
// 8
// 9
// 10
// 12
// 13
// 14
Java
sourceSource<Integer, NotUsed> source1 = Source.range(1, 3);
Source<Integer, NotUsed> source2 = Source.range(8, 10);
Source<Integer, NotUsed> source3 = Source.range(12, 14);

final Source<Integer, NotUsed> sources =
    Source.combine(source1, source2, Collections.singletonList(source3), Concat::create);

sources.runForeach(System.out::println, system);
// prints (order is deterministic)
// 1
// 2
// 3
// 8
// 9
// 10
// 12
// 13
// 14

Reactive Streams semantics

emits when there is demand, but depending on the strategy

completes depends on the strategy