StreamConverters.asJavaStream
Create a sink which materializes into Java 8 Stream that can be run to trigger demand through the sink.
Additional Sink and Source converters
Signature
StreamConvertersStreamConverters
Description
Create a sink which materializes into Java 8 Stream that can be run to trigger demand through the sink. Elements emitted through the stream will be available for reading through the Java 8 Stream.
The Java 8 Stream will be ended when the stream flowing into this Sink completes, and closing the Java Stream will cancel the inflow of this Sink. If the Java Stream throws an exception, the Pekko stream is cancelled.
Be aware that Java Stream blocks current thread while waiting on next element from downstream.
Example
Here is an example of a SinkSink that materializes into a java.util.stream.Stream.
- Scala
-
source
import java.util.stream import java.util.stream.IntStream import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Keep import pekko.stream.scaladsl.Sink import pekko.stream.scaladsl.Source import pekko.stream.scaladsl.StreamConverters val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0) val sink: Sink[Int, stream.Stream[Int]] = StreamConverters.asJavaStream[Int]() val jStream: java.util.stream.Stream[Int] = source.runWith(sink) - Java
-
source
import org.apache.pekko.japi.function.Creator; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.StreamConverters; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.BaseStream; import java.util.stream.IntStream; import java.util.stream.Stream; Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0); Sink<Integer, java.util.stream.Stream<Integer>> sink = StreamConverters.<Integer>asJavaStream(); Stream<Integer> jStream = source.runWith(sink, system);
Reactive Streams semantics
cancels when the Java Stream is closed
backpressures when no read is pending on the Java Stream