mapWithResource
Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.
Signature
Flow.mapWithResource
Flow.mapWithResource
1. create
: Open or Create the resource. 2. f
: Transform each element inputs with the help of resource. 3. close
: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.
Flow.mapWithResource
Flow.mapWithResource
1. create
: Open or Create the autocloseable resource. 2. f
: Transform each element inputs with the help of resource.
Description
Transform each stream element with the help of a resource. The functions are by default called on Pekko’s dispatcher for blocking IO to avoid interfering with other stream operations. See Blocking Needs Careful Management for an explanation on why this is important. The resource creation function is invoked once when the stream is materialized and the returned resource is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream. The returned T MUST NOT be null as it is illegal as stream element - according to the Reactive Streams specification.
The close function is called when upstream or downstream completes normally or exceptionally, and will be called only once. - upstream completes or fails, the optional value returns by close
will be emitted to downstream if defined. - downstream cancels or fails, the optional value returns by close
will be ignored. - shutdowns abruptly, the optional value returns by close
will be ignored.
You can do some clean-up here.
Early completion can be done with combination of the Flow.takeWhile
Flow.takeWhile
operator.
See also unfoldResource, unfoldResourceAsync.
You can configure the default dispatcher for this Source by changing the org.apache.pekko.stream.materializer.blocking-io-dispatcher
or set it for a given Source by using ActorAttributes.
Examples
Imagine we have a database API which may potentially block when we perform a query, and the database connection can be reused for each query.
- Scala
-
source
trait DBDriver { def create(url: URL, userName: String, password: String): Connection } trait Connection { def close(): Unit } trait Database { // blocking query def doQuery(connection: Connection, query: String): QueryResult = ??? } trait QueryResult { def hasMore: Boolean // potentially blocking retrieval of each element def next(): DataBaseRecord // potentially blocking retrieval all element def toList(): List[DataBaseRecord] } trait DataBaseRecord
- Java
-
source
interface DBDriver { Connection create(URL url, String userName, String password); } interface Connection { void close(); } interface Database { // blocking query QueryResult doQuery(Connection connection, String query); } interface QueryResult { boolean hasMore(); // potentially blocking retrieval of each element DatabaseRecord next(); // potentially blocking retrieval all element List<DatabaseRecord> toList(); } interface DatabaseRecord {}
Let’s see how we make use of the API above safely through mapWithResource
:
- Scala
-
source
// some database for JVM val db: Database = ??? Source( List( "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;", "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;")) .mapWithResource(() => dbDriver.create(url, userName, password))( (connection, query) => db.doQuery(connection, query).toList(), conn => { conn.close() None }) .mapConcat(identity) .runForeach(println)
- Java
-
source
// some database for JVM final Database db = null; Source.from( Arrays.asList( "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;", "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;")) .mapWithResource( () -> dbDriver.create(url, userName, password), (connection, query) -> db.doQuery(connection, query).toList(), connection -> { connection.close(); return Optional.empty(); }) .mapConcat(elems -> elems) .runForeach(System.out::println, system);
In this example we retrieve data form two tables with the same shared connection, and transform the results to individual records with mapConcat(identity)
mapConcat(elems -> elems)
, once done the connection is closed.
Reactive Streams semantics
emits the mapping function returns an element and downstream is ready to consume it
backpressures downstream backpressures
completes upstream completes
cancels downstream cancels