Source.unfoldResource
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
Signature
Source.unfoldResource
Source.unfoldResource
Description
Source.unfoldResource
allows us to safely extract stream elements from blocking resources by providing it with three functions:
create
: Open or create the resourceread
: Fetch the next element or signal that we reached the end of the stream by returning aOptional.empty
None
close
: Close the resource, invoked on end of stream or if the stream fails
The functions are by default called on Pekko’s dispatcher for blocking IO to avoid interfering with other stream operations. See Blocking Needs Careful Management for an explanation on why this is important.
Note that there are pre-built unfoldResource
-like operators to wrap java.io.InputStream
s in Additional Sink and Source converters, Iterator
in fromIterator and File IO in File IO Sinks and Sources. Additional prebuilt technology specific connectors can also be found in Pekko Connectors.
Examples
Imagine we have a database API which may potentially block both when we initially perform a query and on retrieving each result from the query. It also gives us an iterator like way to determine if we have reached the end of the result and a close method that must be called to free resources:
- Scala
-
source
trait Database { // blocking query def doQuery(): QueryResult } trait QueryResult { def hasMore: Boolean // potentially blocking retrieval of each element def nextEntry(): DatabaseEntry def close(): Unit } trait DatabaseEntry
- Java
-
source
interface Database { // blocking query QueryResult doQuery(); } interface QueryResult { boolean hasMore(); // potentially blocking retrieval of each element DatabaseEntry nextEntry(); void close(); } interface DatabaseEntry {}
Let’s see how we use the API above safely through unfoldResource
:
- Scala
-
source
// we don't actually have one, it was just made up for the sample val database: Database = ??? val queryResultSource: Source[DatabaseEntry, NotUsed] = Source.unfoldResource[DatabaseEntry, QueryResult]( // open { () => database.doQuery() }, // read { query => if (query.hasMore) Some(query.nextEntry()) else // signals end of resource None }, // close query => query.close()) // process each element queryResultSource.runForeach(println)
- Java
-
source
// we don't actually have one, it was just made up for the sample Database database = null; Source<DatabaseEntry, NotUsed> queryResultSource = Source.unfoldResource( // open () -> database.doQuery(), // read (queryResult) -> { if (queryResult.hasMore()) return Optional.of(queryResult.nextEntry()); else return Optional.empty(); }, // close QueryResult::close); queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system);
If the resource produces more than one element at a time, combining unfoldResource
with mapConcat(identity)
mapConcat(elems -> elems)
will give you a stream of individual elements. See mapConcat) for details.
Reactive Streams semantics
emits when there is demand and the read
function returns a value
completes when the read
function returns None
an empty Optional