Source.combine
Combine several sources, using a given strategy such as merge or concat, into one source.
Signature¶
Description¶
Provides a way to create a “fan-in” of multiple sources without having to use the more advanced GraphDSL.
The way the elements from the sources are combined is pluggable through the strategy
parameter which accepts a function Int => Graph[FanInShape]
where the integer parameter specifies the number of sources that the graph must accept. This makes it possible to use combine
with the built-in Concat
and Merge
by expanding their apply
methods to functions , but also to use an arbitrary strategy.
Combine is most useful when you have more sources than 2 or want to use a custom operator, as there are more concise operators for 2-source concat and merge
Some of the built-in operators that can be used as strategy are:
Examples¶
In this example we Merge
three different sources of integers. The three sources will immediately start contributing elements to the combined source. The individual elements from each source will be in order but the order compared to elements from other sources is not deterministic:
sourceimport org.apache.pekko.stream.scaladsl.{ Concat, Merge, Source }
// ...
val source1 = Source(1 to 3)
val source2 = Source(8 to 10)
val source3 = Source(12 to 14)
val combined = Source.combine(source1, source2, source3)(Merge(_))
combined.runForeach(println)
// could print (order between sources is not deterministic)
// 1
// 12
// 8
// 9
// 13
// 14
// 2
// 10
// 3
sourceimport org.apache.pekko.stream.javadsl.Concat;
import org.apache.pekko.stream.javadsl.Merge;
import org.apache.pekko.stream.javadsl.Source;
// ...
Source<Integer, NotUsed> source1 = Source.range(1, 3);
Source<Integer, NotUsed> source2 = Source.range(8, 10);
Source<Integer, NotUsed> source3 = Source.range(12, 14);
final Source<Integer, NotUsed> combined =
Source.combine(source1, source2, Collections.singletonList(source3), Merge::create);
combined.runForeach(System.out::println, system);
// could print (order between sources is not deterministic)
// 1
// 12
// 8
// 9
// 13
// 14
// 2
// 10
// 3
If we instead use Concat
the first source will get to emit elements until it completes, then the second source until that completes and so on until all the sources has completed.
sourceval source1 = Source(1 to 3)
val source2 = Source(8 to 10)
val source3 = Source(12 to 14)
val sources = Source.combine(source1, source2, source3)(Concat(_))
sources.runForeach(println)
// prints (order is deterministic)
// 1
// 2
// 3
// 8
// 9
// 10
// 12
// 13
// 14
sourceSource<Integer, NotUsed> source1 = Source.range(1, 3);
Source<Integer, NotUsed> source2 = Source.range(8, 10);
Source<Integer, NotUsed> source3 = Source.range(12, 14);
final Source<Integer, NotUsed> sources =
Source.combine(source1, source2, Collections.singletonList(source3), Concat::create);
sources.runForeach(System.out::println, system);
// prints (order is deterministic)
// 1
// 2
// 3
// 8
// 9
// 10
// 12
// 13
// 14
Reactive Streams semantics¶
emits when there is demand, but depending on the strategy
completes depends on the strategy