Source.unfoldResource

Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.

Source operators

Signature

Source.unfoldResourceSource.unfoldResource

Description

Source.unfoldResource allows us to safely extract stream elements from blocking resources by providing it with three functions:

  1. create: Open or create the resource
  2. read: Fetch the next element or signal that we reached the end of the stream by returning a Optional.emptyNone
  3. close: Close the resource, invoked on end of stream or if the stream fails

The functions are by default called on Pekko’s dispatcher for blocking IO to avoid interfering with other stream operations. See Blocking Needs Careful Management for an explanation on why this is important.

Note that there are pre-built unfoldResource-like operators to wrap java.io.InputStreams in Additional Sink and Source converters, Iterator in fromIterator and File IO in File IO Sinks and Sources. Additional prebuilt technology specific connectors can also be found in Pekko Connectors.

Examples

Imagine we have a database API which may potentially block both when we initially perform a query and on retrieving each result from the query. It also gives us an iterator like way to determine if we have reached the end of the result and a close method that must be called to free resources:

Scala
sourcetrait Database {
  // blocking query
  def doQuery(): QueryResult
}
trait QueryResult {
  def hasMore: Boolean
  // potentially blocking retrieval of each element
  def nextEntry(): DatabaseEntry
  def close(): Unit
}
trait DatabaseEntry
Java
sourceinterface Database {
  // blocking query
  QueryResult doQuery();
}

interface QueryResult {
  boolean hasMore();
  // potentially blocking retrieval of each element
  DatabaseEntry nextEntry();

  void close();
}

interface DatabaseEntry {}

Let’s see how we use the API above safely through unfoldResource:

Scala
source// we don't actually have one, it was just made up for the sample
val database: Database = ???

val queryResultSource: Source[DatabaseEntry, NotUsed] =
  Source.unfoldResource[DatabaseEntry, QueryResult](
    // open
    { () =>
      database.doQuery()
    },
    // read
    { query =>
      if (query.hasMore)
        Some(query.nextEntry())
      else
        // signals end of resource
        None
    },
    // close
    query => query.close())

// process each element
queryResultSource.runForeach(println)
Java
source// we don't actually have one, it was just made up for the sample
Database database = null;

Source<DatabaseEntry, NotUsed> queryResultSource =
    Source.unfoldResource(
        // open
        () -> database.doQuery(),
        // read
        (queryResult) -> {
          if (queryResult.hasMore()) return Optional.of(queryResult.nextEntry());
          else return Optional.empty();
        },
        // close
        QueryResult::close);

queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system);

If the resource produces more than one element at a time, combining unfoldResource with mapConcat(identity)mapConcat(elems -> elems) will give you a stream of individual elements. See mapConcat) for details.

Reactive Streams semantics

emits when there is demand and the read function returns a value

completes when the read function returns Nonean empty Optional