Source.zipWithN
Combine the elements of multiple streams into a stream of sequences using a combiner function.
Signature
Source.zipWithNSource.zipWithN
Description
Combine the elements of multiple streams into a stream of sequences using a combiner function.
This operator is essentially the same as using zipN followed by map to turn the zipped sequence into an arbitrary object to emit downstream.
This operator adheres to the ActorAttributes.SupervisionStrategy attribute for exceptions thrown by the zipper function. On Supervision.Stop the stream fails; on Supervision.Resume and Supervision.Restart the failing zipped element is dropped and the stream continues.
See also:
Example
In this sample we zip three streams of integers and for each zipped sequence of numbers we calculate the max value and send downstream:
- Scala
-
source
val numbers = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: Nil) val otherNumbers = Source(5 :: 2 :: 1 :: 4 :: 10 :: 4 :: Nil) val andSomeOtherNumbers = Source(3 :: 7 :: 2 :: 1 :: 1 :: Nil) Source .zipWithN((seq: Seq[Int]) => seq.max)(numbers :: otherNumbers :: andSomeOtherNumbers :: Nil) .runForeach(println) // prints: // 5 // 7 // 3 // 4 // 10 - Java
-
source
Source<Integer, NotUsed> numbers = Source.from(List.of(1, 2, 3, 4, 5, 6)); Source<Integer, NotUsed> otherNumbers = Source.from(List.of(5, 2, 1, 4, 10, 4)); Source<Integer, NotUsed> andSomeOtherNumbers = Source.from(List.of(3, 7, 2, 1, 1)); Source.zipWithN( (List<Integer> seq) -> seq.stream().mapToInt(i -> i).max().getAsInt(), List.of(numbers, otherNumbers, andSomeOtherNumbers)) .runForeach(System.out::println, system); // prints: // 5 // 7 // 3 // 4 // 10
Note how it stops as soon as any of the original sources reaches its end.
Reactive Streams semantics
emits when all of the inputs has an element available
completes when any upstream completes
backpressures all upstreams when downstream backpressures but also on an upstream that has emitted an element until all other upstreams has emitted elements