Source.combine
Combine several sources, using a given strategy such as merge or concat, into one source.
Signature
Description
Provides a way to create a “fan-in” of multiple sources without having to use the more advanced GraphDSL.
The way the elements from the sources are combined is pluggable through the strategy
parameter which accepts a function Int => Graph[FanInShape]
Integer -> Graph<FanInShape>
where the integer parameter specifies the number of sources that the graph must accept. This makes it possible to use combine
with the built-in Concat
and Merge
by expanding their apply
methods to functionsusing a method reference to their create
methods, but also to use an arbitrary strategy.
Combine is most useful when you have more sources than 2 or want to use a custom operator, as there are more concise operators for 2-source concat and merge
Some of the built-in operators that can be used as strategy are:
Merge
Merge
Concat
Concat
MergePrioritized
MergePrioritized
MergeLatest
MergeLatest
ZipN
ZipN
ZipWithN
ZipWithN
Examples
In this example we Merge
three different sources of integers. The three sources will immediately start contributing elements to the combined source. The individual elements from each source will be in order but the order compared to elements from other sources is not deterministic:
- Scala
-
source
import org.apache.pekko.stream.scaladsl.{ Concat, Merge, Source } // ... val source1 = Source(1 to 3) val source2 = Source(8 to 10) val source3 = Source(12 to 14) val combined = Source.combine(source1, source2, source3)(Merge(_)) combined.runForeach(println) // could print (order between sources is not deterministic) // 1 // 12 // 8 // 9 // 13 // 14 // 2 // 10 // 3
- Java
-
source
import org.apache.pekko.stream.javadsl.Concat; import org.apache.pekko.stream.javadsl.Merge; import org.apache.pekko.stream.javadsl.Source; // ... Source<Integer, NotUsed> source1 = Source.range(1, 3); Source<Integer, NotUsed> source2 = Source.range(8, 10); Source<Integer, NotUsed> source3 = Source.range(12, 14); final Source<Integer, NotUsed> combined = Source.combine(source1, source2, Collections.singletonList(source3), Merge::create); combined.runForeach(System.out::println, system); // could print (order between sources is not deterministic) // 1 // 12 // 8 // 9 // 13 // 14 // 2 // 10 // 3
If we instead use Concat
the first source will get to emit elements until it completes, then the second source until that completes and so on until all the sources has completed.
- Scala
-
source
val source1 = Source(1 to 3) val source2 = Source(8 to 10) val source3 = Source(12 to 14) val sources = Source.combine(source1, source2, source3)(Concat(_)) sources.runForeach(println) // prints (order is deterministic) // 1 // 2 // 3 // 8 // 9 // 10 // 12 // 13 // 14
- Java
-
source
Source<Integer, NotUsed> source1 = Source.range(1, 3); Source<Integer, NotUsed> source2 = Source.range(8, 10); Source<Integer, NotUsed> source3 = Source.range(12, 14); final Source<Integer, NotUsed> sources = Source.combine(source1, source2, Collections.singletonList(source3), Concat::create); sources.runForeach(System.out::println, system); // prints (order is deterministic) // 1 // 2 // 3 // 8 // 9 // 10 // 12 // 13 // 14
Reactive Streams semantics
emits when there is demand, but depending on the strategy
completes depends on the strategy