zipWith
Combines elements from multiple sources through a combine function and passes the returned value downstream.
Signature
Source.zipWithSource.zipWith Flow.zipWithFlow.zipWith
Description
Combines elements from multiple sources through a combine function and passes the returned value downstream.
This operator adheres to the ActorAttributes.SupervisionStrategy attribute for exceptions thrown by the combine function. On Supervision.Stop the stream fails; on Supervision.Resume and Supervision.Restart the failing zipped element is dropped and the stream continues.
See also:
Examples
- Scala
-
source
import org.apache.pekko import pekko.stream.scaladsl.Source import pekko.stream.scaladsl.Sink val sourceCount = Source(List("one", "two", "three")) val sourceFruits = Source(List("apple", "orange", "banana")) sourceCount .zipWith(sourceFruits) { (countStr, fruitName) => s"$countStr $fruitName" } .runWith(Sink.foreach(println)) // this will print 'one apple', 'two orange', 'three banana' - Java
-
source
import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.javadsl.Sink; import java.util.*; Source<String, NotUsed> sourceCount = Source.from(List.of("one", "two", "three")); Source<String, NotUsed> sourceFruits = Source.from(List.of("apple", "orange", "banana")); sourceCount .zipWith( sourceFruits, (Function2<String, String, String>) (countStr, fruitName) -> countStr + " " + fruitName) .runForeach(System.out::println, system); // this will print 'one apple', 'two orange', 'three banana'
Reactive Streams semantics
emits when all of the inputs have an element available
backpressures both upstreams when downstream backpressures but also on an upstream that has emitted an element until the other upstream has emitted an element
completes when any upstream completes