Sink.asPublisher
Integration with Reactive Streams, materializes into a org.reactivestreams.Publisher
.
Signature
Sink.asPublisher
Sink.asPublisher
Description
This method gives you the capability to publish the data from the Sink
through a Reactive Streams Publisher. Generally, in Pekko Streams a Sink
is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations Sink.asPublisher
provides a Publisher
materialized value when run. Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the fanout
parameter. In Java 9, the Reactive Stream API was included in the JDK, and Publisher
is available through Flow.Publisher. Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate publisher sink for those is available through org.apache.pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher
org.apache.pekko.stream.javadsl.JavaFlowSupport.Sink#asPublisher
.
Example
In the example we are using a source and then creating a Publisher. After that, we see that when fanout
is true multiple subscribers can subscribe to it, but when it is false only the first subscriber will be able to subscribe and others will be rejected.
- Scala
-
source
val source = Source(1 to 5) val publisher = source.runWith(Sink.asPublisher(false)) Source.fromPublisher(publisher).runWith(Sink.foreach(println)) // 1 2 3 4 5 Source .fromPublisher(publisher) .runWith(Sink.foreach(println)) // No output, because the source was not able to subscribe to the publisher.
- Java
-
source
Source<Integer, NotUsed> source = Source.range(1, 5); Publisher<Integer> publisherFalse = source.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system); CompletionStage<Integer> resultFromFirstSubscriberFalse = Source.fromPublisher(publisherFalse) .runWith(Sink.fold(0, (acc, element) -> acc + element), system); CompletionStage<Integer> resultFromSecondSubscriberFalse = Source.fromPublisher(publisherFalse) .runWith(Sink.fold(1, (acc, element) -> acc * element), system); resultFromFirstSubscriberFalse.thenAccept(System.out::println); // 15 resultFromSecondSubscriberFalse.thenAccept( System.out ::println); // No output, because the source was not able to subscribe to the publisher.
Reactive Streams semantics
emits the materialized publisher
completes after the source is consumed and materialized publisher is created