StreamConverters.asJavaStream
Create a sink which materializes into Java 8 Stream
that can be run to trigger demand through the sink.
Additional Sink and Source converters
Signature¶
Description¶
Create a sink which materializes into Java 8 Stream
that can be run to trigger demand through the sink. Elements emitted through the stream will be available for reading through the Java 8 Stream
.
The Java 8 Stream
will be ended when the stream flowing into this Sink
completes, and closing the Java Stream
will cancel the inflow of this Sink
. If the Java Stream
throws an exception, the Pekko stream is cancelled.
Be aware that Java Stream
blocks current thread while waiting on next element from downstream.
Example¶
Here is an example of a Sink
that materializes into a java.util.stream.Stream
.
sourceimport java.util.stream
import java.util.stream.IntStream
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.StreamConverters
val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0)
val sink: Sink[Int, stream.Stream[Int]] = StreamConverters.asJavaStream[Int]()
val jStream: java.util.stream.Stream[Int] = source.runWith(sink)
sourceimport org.apache.pekko.japi.function.Creator;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.StreamConverters;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.BaseStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0);
Sink<Integer, java.util.stream.Stream<Integer>> sink = StreamConverters.<Integer>asJavaStream();
Stream<Integer> jStream = source.runWith(sink, system);
Reactive Streams semantics¶
cancels when the Java Stream is closed
backpressures when no read is pending on the Java Stream