Source.unfoldResource
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
Signature
Source.unfoldResourceSource.unfoldResource
Description
Source.unfoldResource allows us to safely extract stream elements from blocking resources by providing it with three functions: 
- create: Open or create the resource
- read: Fetch the next element or signal that we reached the end of the stream by returning a- Optional.empty- None
- close: Close the resource, invoked on end of stream or if the stream fails
The functions are by default called on Pekko’s dispatcher for blocking IO to avoid interfering with other stream operations. See Blocking Needs Careful Management for an explanation on why this is important.
Note that there are pre-built unfoldResource-like operators to wrap java.io.InputStreams in Additional Sink and Source converters, Iterator in fromIterator and File IO in File IO Sinks and Sources. Additional prebuilt technology specific connectors can also be found in Pekko Connectors.
Examples
Imagine we have a database API which may potentially block both when we initially perform a query and on retrieving each result from the query. It also gives us an iterator like way to determine if we have reached the end of the result and a close method that must be called to free resources:
- Scala
- 
  source trait Database { // blocking query def doQuery(): QueryResult } trait QueryResult { def hasMore: Boolean // potentially blocking retrieval of each element def nextEntry(): DatabaseEntry def close(): Unit } trait DatabaseEntry
- Java
- 
  source interface Database { // blocking query QueryResult doQuery(); } interface QueryResult { boolean hasMore(); // potentially blocking retrieval of each element DatabaseEntry nextEntry(); void close(); } interface DatabaseEntry {}
Let’s see how we use the API above safely through unfoldResource:
- Scala
- 
  source // we don't actually have one, it was just made up for the sample val database: Database = ??? val queryResultSource: Source[DatabaseEntry, NotUsed] = Source.unfoldResource[DatabaseEntry, QueryResult]( // open { () => database.doQuery() }, // read { query => if (query.hasMore) Some(query.nextEntry()) else // signals end of resource None }, // close query => query.close()) // process each element queryResultSource.runForeach(println)
- Java
- 
  source // we don't actually have one, it was just made up for the sample Database database = null; Source<DatabaseEntry, NotUsed> queryResultSource = Source.unfoldResource( // open () -> database.doQuery(), // read (queryResult) -> { if (queryResult.hasMore()) return Optional.of(queryResult.nextEntry()); else return Optional.empty(); }, // close QueryResult::close); queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system);
If the resource produces more than one element at a time, combining unfoldResource with mapConcat(identity)mapConcat(elems -> elems) will give you a stream of individual elements. See mapConcat) for details.
Reactive Streams semantics
emits when there is demand and the read function returns a value
completes when the read function returns Nonean empty Optional