Source.asSubscriber
Integration with Reactive Streams, materializes into a Subscriber
.
Signature
- Scala
-
source
def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] =
- Java
-
source
static <T> org.apache.pekko.stream.javadsl.Source<T, Subscriber<T>> asSubscriber()
Description
If you want to create a Source
Source
that gets its elements from another library that supports Reactive Streams, you can use JavaFlowSupport.Source.asSubscriber
. Each time this Source
Source
is materialized, it produces a materialized value of type java.util.concurrent.Flow.Subscriber
. This Subscriber
can be attached to a Reactive Streams Publisher
to populate it.
If the API you want to consume elements from provides a Publisher
instead of accepting a Subscriber
, see fromPublisher.
For JDK 8 users: since java.util.concurrent.Flow
was introduced in JDK version 9, if you are still on version 8 you may use the org.reactivestreams library with Source.asSubscriber
Source.asSubscriber
.
Example
Suppose we use a database client that supports Reactive Streams, we could create a Source
Source
that queries the database for its rows. That Source
Source
can then be used for further processing, for example creating a Source
Source
that contains the names of the rows.
Note that since the database is queried for each materialization, the rowSource
can be safely re-used. Because both the database driver and Pekko Streams support Reactive Streams, backpressure is applied throughout the stream, preventing us from running out of memory when the database rows are consumed slower than they are produced by the database.
- Scala
-
source
import java.util.concurrent.Flow.Subscriber import java.util.concurrent.Flow.Publisher import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.stream.scaladsl.JavaFlowSupport val rowSource: Source[Row, NotUsed] = JavaFlowSupport.Source.asSubscriber .mapMaterializedValue((subscriber: Subscriber[Row]) => { // For each materialization, fetch the rows from the database: val rows: Publisher[Row] = databaseClient.fetchRows() rows.subscribe(subscriber) NotUsed }); val names: Source[String, NotUsed] = // rowSource can be re-used, since it will start a new // query for each materialization, fully supporting backpressure // for each materialized stream: rowSource.map(row => row.name)
- Java
-
source
import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Publisher; import org.apache.pekko.NotUsed; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.javadsl.JavaFlowSupport; class Example { Source<Row, NotUsed> rowSource = JavaFlowSupport.Source.<Row>asSubscriber() .mapMaterializedValue( subscriber -> { // For each materialization, fetch the rows from the database: Publisher<Row> rows = databaseClient.fetchRows(); rows.subscribe(subscriber); return NotUsed.getInstance(); }); public Source<String, NotUsed> names() { // rowSource can be re-used, since it will start a new // query for each materialization, fully supporting backpressure // for each materialized stream: return rowSource.map(row -> row.getField("name")); } }