Reactive Streams Interop
Dependency
To use Pekko Streams, add the module to your project:
- sbt
val PekkoVersion = "1.0.3" libraryDependencies += "org.apache.pekko" %% "pekko-stream" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.3") implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}" }
Overview
Pekko Streams implements the Reactive Streams standard for asynchronous stream processing with non-blocking back pressure.
Since Java 9 the APIs of Reactive Streams has been included in the Java Standard library, under the java.util.concurrent.Flow
namespace. For Java 8 there is instead a separate Reactive Streams artifact with the same APIs in the package org.reactivestreams
.
Pekko streams provides interoperability for both these two API versions, the Reactive Streams interfaces directly through factories on the regular Source
and Sink
APIs. For the Java 9 and later built in interfaces there is a separate set of factories in org.apache.pekko.stream.scaladsl.JavaFlowSupport
org.apache.pekko.stream.javadsl.JavaFlowSupport
.
In the following samples the standalone Reactive Stream API factories has been used but each such call can be replaced with the corresponding method from JavaFlowSupport
and the JDK java.util.concurrent.Flow._
java.util.concurrent.Flow.*
interfaces.
Note that it is not possible to use JavaFlowSupport
on Java 8 since the needed interfaces simply is not available in the Java standard library.
The two most important interfaces in Reactive Streams are the Publisher
and Subscriber
.
- Scala
-
source
import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Processor
- Java
-
source
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Processor;
Let us assume that a library provides a publisher of tweets:
and another library knows how to store author handles in a database:
Using a Pekko Streams Flow
we can transform the stream and connect those:
- Scala
-
source
val authors = Flow[Tweet].filter(_.hashtags.contains(pekkoTag)).map(_.author) Source.fromPublisher(tweets).via(authors).to(Sink.fromSubscriber(storage)).run()
- Java
-
source
final Flow<Tweet, Author, NotUsed> authors = Flow.of(Tweet.class).filter(t -> t.hashtags().contains(PEKKO)).map(t -> t.author); Source.fromPublisher(rs.tweets()).via(authors).to(Sink.fromSubscriber(rs.storage()));
The Publisher
is used as an input Source
to the flow and the Subscriber
is used as an output Sink
.
A Flow
can also be converted to a RunnableGraph[Processor[In, Out]]
which materializes to a Processor
when run()
is called. run()
itself can be called multiple times, resulting in a new Processor
instance each time.
- Scala
-
source
val processor: Processor[Tweet, Author] = authors.toProcessor.run() tweets.subscribe(processor) processor.subscribe(storage)
- Java
-
source
final Processor<Tweet, Author> processor = authors.toProcessor().run(system); rs.tweets().subscribe(processor); processor.subscribe(rs.storage());
A publisher can be connected to a subscriber with the subscribe
method.
It is also possible to expose a Source
as a Publisher
by using the Publisher-Sink
:
- Scala
-
source
val authorPublisher: Publisher[Author] = Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = false)) authorPublisher.subscribe(storage)
- Java
-
source
final Publisher<Author> authorPublisher = Source.fromPublisher(rs.tweets()) .via(authors) .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system); authorPublisher.subscribe(rs.storage());
A publisher that is created with Sink.asPublisher(fanout = false)
Sink.asPublisher(AsPublisher.WITHOUT_FANOUT)
supports only a single subscription. Additional subscription attempts will be rejected with an IllegalStateException
.
A publisher that supports multiple subscribers using fan-out/broadcasting is created as follows:
- Scala
-
source
def alert: Subscriber[Author] def storage: Subscriber[Author]
- Java
-
source
Subscriber<Author> alert(); Subscriber<Author> storage();
- Scala
-
source
val authorPublisher: Publisher[Author] = Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = true)) authorPublisher.subscribe(storage) authorPublisher.subscribe(alert)
- Java
-
source
final Publisher<Author> authorPublisher = Source.fromPublisher(rs.tweets()) .via(authors) .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), system); authorPublisher.subscribe(rs.storage()); authorPublisher.subscribe(rs.alert());
The input buffer size of the operator controls how far apart the slowest subscriber can be from the fastest subscriber before slowing down the stream.
To make the picture complete, it is also possible to expose a Sink
as a Subscriber
by using the Subscriber-Source
:
- Scala
-
source
val tweetSubscriber: Subscriber[Tweet] = authors.to(Sink.fromSubscriber(storage)).runWith(Source.asSubscriber[Tweet]) tweets.subscribe(tweetSubscriber)
- Java
-
source
final Subscriber<Author> storage = rs.storage(); final Subscriber<Tweet> tweetSubscriber = authors.to(Sink.fromSubscriber(storage)).runWith(Source.asSubscriber(), system); rs.tweets().subscribe(tweetSubscriber);
It is also possible to use re-wrap Processor
instances as a Flow
by passing a factory function that will create the Processor
instances:
- Scala
-
source
// An example Processor factory def createProcessor: Processor[Int, Int] = Flow[Int].toProcessor.run() val flow: Flow[Int, Int, NotUsed] = Flow.fromProcessor(() => createProcessor)
- Java
-
source
// An example Processor factory final Creator<Processor<Integer, Integer>> factory = new Creator<Processor<Integer, Integer>>() { public Processor<Integer, Integer> create() { return Flow.of(Integer.class).toProcessor().run(system); } }; final Flow<Integer, Integer, NotUsed> flow = Flow.fromProcessor(factory);
Please note that a factory is necessary to achieve reusability of the resulting Flow
.
Other implementations
Implementing Reactive Streams makes it possible to plug Pekko Streams together with other stream libraries that adhere to the standard. An incomplete list of other implementations: