Source.unfoldResourceAsync
Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.
Signature
Source.unfoldResourceAsync
Source.unfoldResourceAsync
Description
Wrap any resource that can be opened, queried for next element and closed in an asynchronous way with three distinct functions into a source. This operator is the equivalent of unfoldResource but for resources with asynchronous APIs.
Source.unfoldResourceAsync
allows us to safely extract stream elements from a resource with an async API by providing it with three functions that all return a Future
CompletionStage
:
create
: Open or create the resourceread
: Fetch the next element or signal that we reached the end of the stream by completing theFuture
CompletionStage
with aOptional.empty
None
close
: Close the resource, invoked on end of stream or if the stream fails
All exceptions thrown by create
and close
as well as the Future
CompletionStage
s completing with failure will fail the stream. The supervision strategy is used to handle exceptions from read
, create
and from the Future
CompletionStage
s.
Note that there are pre-built unfoldResourceAsync
-like operators to wrap java.io.InputStream
s in Additional Sink and Source converters, Iterator
in fromIterator and File IO in File IO Sinks and Sources. Additional prebuilt technology specific connectors can also be found in Pekko Connectors.
Examples
Imagine we have an async database API which we initially perform an async query and then can check if there are more results in an asynchronous way.
- Scala
-
source
trait Database { // blocking query def doQuery(): Future[QueryResult] } trait QueryResult { def hasMore(): Future[Boolean] def nextEntry(): Future[DatabaseEntry] def close(): Future[Unit] } trait DatabaseEntry
- Java
-
source
interface Database { // async query CompletionStage<QueryResult> doQuery(); } interface QueryResult { // are there more results CompletionStage<Boolean> hasMore(); // async retrieval of each element CompletionStage<DatabaseEntry> nextEntry(); CompletionStage<Void> close(); } interface DatabaseEntry {}
Let’s see how we use the API above safely through unfoldResourceAsync
:
- Scala
-
source
// we don't actually have one, it was just made up for the sample val database: Database = ??? val queryResultSource: Source[DatabaseEntry, NotUsed] = Source.unfoldResourceAsync[DatabaseEntry, QueryResult]( // open () => database.doQuery(), // read query => query.hasMore().flatMap { case false => Future.successful(None) case true => query.nextEntry().map(dbEntry => Some(dbEntry)) }, // close query => query.close().map(_ => Done)) // process each element queryResultSource.runForeach(println)
- Java
-
source
// we don't actually have one, it was just made up for the sample Database database = null; Source<DatabaseEntry, NotUsed> queryResultSource = Source.unfoldResourceAsync( // open database::doQuery, // read this::readQueryResult, // close queryResult -> queryResult.close().thenApply(__ -> Done.done())); queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system); private CompletionStage<Optional<DatabaseEntry>> readQueryResult(QueryResult queryResult) { return queryResult .hasMore() .thenCompose( more -> { if (more) { return queryResult.nextEntry().thenApply(Optional::of); } else { return CompletableFuture.completedFuture(Optional.empty()); } }); }
Reactive Streams semantics
emits when there is demand and Future
CompletionStage
from read function returns value
completes when Future
CompletionStage
from read function returns None