Source.zipWithN
Combine the elements of multiple streams into a stream of sequences using a combiner function.
Signature¶
Description¶
Combine the elements of multiple streams into a stream of sequences using a combiner function.
This operator is essentially the same as using zipN followed by map to turn the zipped sequence into an arbitrary object to emit downstream.
See also:
Example¶
In this sample we zip three streams of integers and for each zipped sequence of numbers we calculate the max value and send downstream:
sourceval numbers = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: Nil)
val otherNumbers = Source(5 :: 2 :: 1 :: 4 :: 10 :: 4 :: Nil)
val andSomeOtherNumbers = Source(3 :: 7 :: 2 :: 1 :: 1 :: Nil)
Source
.zipWithN((seq: Seq[Int]) => seq.max)(numbers :: otherNumbers :: andSomeOtherNumbers :: Nil)
.runForeach(println)
// prints:
// 5
// 7
// 3
// 4
// 10
sourceSource<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6));
Source<Integer, NotUsed> otherNumbers = Source.from(Arrays.asList(5, 2, 1, 4, 10, 4));
Source<Integer, NotUsed> andSomeOtherNumbers = Source.from(Arrays.asList(3, 7, 2, 1, 1));
Source.zipWithN(
(List<Integer> seq) -> seq.stream().mapToInt(i -> i).max().getAsInt(),
Arrays.asList(numbers, otherNumbers, andSomeOtherNumbers))
.runForeach(System.out::println, system);
// prints:
// 5
// 7
// 3
// 4
// 10
Note how it stops as soon as any of the original sources reaches its end.
Reactive Streams semantics¶
emits when all of the inputs has an element available
completes when any upstream completes
backpressures all upstreams when downstream backpressures but also on an upstream that has emitted an element until all other upstreams has emitted elements