mapWithResource

Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.

Simple operators

Signature

Flow.mapWithResourceFlow.mapWithResource 1. create: Open or Create the resource. 2. f: Transform each element inputs with the help of resource. 3. close: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.

Flow.mapWithResourceFlow.mapWithResource 1. create: Open or Create the autocloseable resource. 2. f: Transform each element inputs with the help of resource.

Description

Transform each stream element with the help of a resource. The functions are by default called on Pekko’s dispatcher for blocking IO to avoid interfering with other stream operations. See Blocking Needs Careful Management for an explanation on why this is important. The resource creation function is invoked once when the stream is materialized and the returned resource is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream. The returned T MUST NOT be null as it is illegal as stream element - according to the Reactive Streams specification.

The close function is called when upstream or downstream completes normally or exceptionally, and will be called only once. - upstream completes or fails, the optional value returns by close will be emitted to downstream if defined. - downstream cancels or fails, the optional value returns by close will be ignored. - shutdowns abruptly, the optional value returns by close will be ignored.
You can do some clean-up here.

Early completion can be done with combination of the Flow.takeWhileFlow.takeWhile operator.

See also unfoldResource, unfoldResourceAsync.

You can configure the default dispatcher for this Source by changing the org.apache.pekko.stream.materializer.blocking-io-dispatcher or set it for a given Source by using ActorAttributes.

Examples

Imagine we have a database API which may potentially block when we perform a query, and the database connection can be reused for each query.

Scala
sourcetrait DBDriver {
  def create(url: URL, userName: String, password: String): Connection
}
trait Connection {
  def close(): Unit
}
trait Database {
  // blocking query
  def doQuery(connection: Connection, query: String): QueryResult = ???
}
trait QueryResult {
  def hasMore: Boolean
  // potentially blocking retrieval of each element
  def next(): DataBaseRecord
  // potentially blocking retrieval all element
  def toList(): List[DataBaseRecord]
}
trait DataBaseRecord
Java
sourceinterface DBDriver {
  Connection create(URL url, String userName, String password);
}

interface Connection {
  void close();
}

interface Database {
  // blocking query
  QueryResult doQuery(Connection connection, String query);
}

interface QueryResult {
  boolean hasMore();

  // potentially blocking retrieval of each element
  DatabaseRecord next();

  // potentially blocking retrieval all element
  List<DatabaseRecord> toList();
}

interface DatabaseRecord {}

Let’s see how we make use of the API above safely through mapWithResource:

Scala
source// some database for JVM
val db: Database = ???
Source(
  List(
    "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
    "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
  .mapWithResource(() => dbDriver.create(url, userName, password))(
    (connection, query) => db.doQuery(connection, query).toList(),
    conn => {
      conn.close()
      None
    })
  .mapConcat(identity)
  .runForeach(println)
Java
source// some database for JVM
final Database db = null;
Source.from(
        Arrays.asList(
            "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
            "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
    .mapWithResource(
        () -> dbDriver.create(url, userName, password),
        (connection, query) -> db.doQuery(connection, query).toList(),
        connection -> {
          connection.close();
          return Optional.empty();
        })
    .mapConcat(elems -> elems)
    .runForeach(System.out::println, system);

In this example we retrieve data form two tables with the same shared connection, and transform the results to individual records with mapConcat(identity)mapConcat(elems -> elems), once done the connection is closed.

Reactive Streams semantics

emits the mapping function returns an element and downstream is ready to consume it

backpressures downstream backpressures

completes upstream completes

cancels downstream cancels