Source.unfoldResourceAsync

Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.

Source operators

Signature

Source.unfoldResourceAsyncSource.unfoldResourceAsync

Description

Wrap any resource that can be opened, queried for next element and closed in an asynchronous way with three distinct functions into a source. This operator is the equivalent of unfoldResource but for resources with asynchronous APIs.

Source.unfoldResourceAsync allows us to safely extract stream elements from a resource with an async API by providing it with three functions that all return a FutureCompletionStage:

  1. create: Open or create the resource
  2. read: Fetch the next element or signal that we reached the end of the stream by completing the FutureCompletionStage with a Optional.emptyNone
  3. close: Close the resource, invoked on end of stream or if the stream fails

All exceptions thrown by create and close as well as the FutureCompletionStages completing with failure will fail the stream. The supervision strategy is used to handle exceptions from read, create and from the FutureCompletionStages.

Note that there are pre-built unfoldResourceAsync-like operators to wrap java.io.InputStreams in Additional Sink and Source converters, Iterator in fromIterator and File IO in File IO Sinks and Sources. Additional prebuilt technology specific connectors can also be found in Pekko Connectors.

Examples

Imagine we have an async database API which we initially perform an async query and then can check if there are more results in an asynchronous way.

Scala
sourcetrait Database {
  // blocking query
  def doQuery(): Future[QueryResult]
}
trait QueryResult {
  def hasMore(): Future[Boolean]
  def nextEntry(): Future[DatabaseEntry]
  def close(): Future[Unit]
}
trait DatabaseEntry
Java
sourceinterface Database {
  // async query
  CompletionStage<QueryResult> doQuery();
}

interface QueryResult {

  // are there more results
  CompletionStage<Boolean> hasMore();

  // async retrieval of each element
  CompletionStage<DatabaseEntry> nextEntry();

  CompletionStage<Void> close();
}

interface DatabaseEntry {}

Let’s see how we use the API above safely through unfoldResourceAsync:

Scala
source// we don't actually have one, it was just made up for the sample
val database: Database = ???

val queryResultSource: Source[DatabaseEntry, NotUsed] =
  Source.unfoldResourceAsync[DatabaseEntry, QueryResult](
    // open
    () => database.doQuery(),
    // read
    query =>
      query.hasMore().flatMap {
        case false => Future.successful(None)
        case true  => query.nextEntry().map(dbEntry => Some(dbEntry))
      },
    // close
    query => query.close().map(_ => Done))

// process each element
queryResultSource.runForeach(println)
Java
source  // we don't actually have one, it was just made up for the sample
  Database database = null;

  Source<DatabaseEntry, NotUsed> queryResultSource =
      Source.unfoldResourceAsync(
          // open
          database::doQuery,
          // read
          this::readQueryResult,
          // close
          queryResult -> queryResult.close().thenApply(__ -> Done.done()));

  queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system);
private CompletionStage<Optional<DatabaseEntry>> readQueryResult(QueryResult queryResult) {
  return queryResult
      .hasMore()
      .thenCompose(
          more -> {
            if (more) {
              return queryResult.nextEntry().thenApply(Optional::of);
            } else {
              return CompletableFuture.completedFuture(Optional.empty());
            }
          });
}

Reactive Streams semantics

emits when there is demand and Future CompletionStage from read function returns value

completes when Future CompletionStage from read function returns None