Source.fromPublisher

Integration with Reactive Streams, subscribes to a org.reactivestreams.Publisher.

Source operators

Signature

Scala
sourcedef fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] =
Java
sourcestatic <T> org.apache.pekko.stream.javadsl.Source<T, NotUsed> fromPublisher(Publisher<T> publisher)

Description

If you want to create a SourceSource that gets its elements from another library that supports Reactive Streams, you can use Source.fromPublisher. This source will produce the elements from the org.reactivestreams.Publisher, and coordinate backpressure as needed.

If the API you want to consume elements from accepts a org.reactivestreams.Subscriber instead of providing a org.reactivestreams.Publisher, see asSubscriber.

In Java 9, the Reactive Stream API was included in the JDK, and Publisher is available through Flow.Publisher. Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate API for those is available through org.apache.pekko.stream.scaladsl.JavaFlowSupport.Source#fromPublisherorg.apache.pekko.stream.javadsl.JavaFlowSupport.Source#fromPublisher.

Example

Suppose we use a database client that supports Reactive Streams, we could create a SourceSource that queries the database for its rows. That SourceSource can then be used for further processing, for example creating a SourceSource that contains the names of the rows.

Because both the database driver and Pekko Streams support Reactive Streams, backpressure is applied throughout the stream, preventing us from running out of memory when the database rows are consumed slower than they are produced by the database.

Scala
sourceimport java.util.concurrent.Flow.Subscriber
import java.util.concurrent.Flow.Publisher

import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.JavaFlowSupport

val names: Source[String, NotUsed] =
  // A new subscriber will subscribe to the supplied publisher for each
  // materialization, so depending on whether the database client supports
  // this the Source can be materialized more than once.
  JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows())
    .map(row => row.name)
Java
sourceimport java.util.concurrent.Flow.Publisher;

import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.JavaFlowSupport;

class Example {
    public Source<String, NotUsed> names() {
        // A new subscriber will subscribe to the supplied publisher for each
        // materialization, so depending on whether the database client supports
        // this the Source can be materialized more than once.
        return JavaFlowSupport.Source.<Row>fromPublisher(databaseClient.fetchRows())
            .map(row -> row.getField("name"));
    }
}