Source.unfoldResource
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
Signature¶
Description¶
Source.unfoldResource
allows us to safely extract stream elements from blocking resources by providing it with three functions:
create
: Open or create the resourceread
: Fetch the next element or signal that we reached the end of the stream by returning aNone
close
: Close the resource, invoked on end of stream or if the stream fails
The functions are by default called on Pekko’s dispatcher for blocking IO to avoid interfering with other stream operations. See Blocking Needs Careful Management for an explanation on why this is important.
Note that there are pre-built unfoldResource
-like operators to wrap java.io.InputStream
s in Additional Sink and Source converters, Iterator
in fromIterator and File IO in File IO Sinks and Sources. Additional prebuilt technology specific connectors can also be found in Pekko Connectors.
Examples¶
Imagine we have a database API which may potentially block both when we initially perform a query and on retrieving each result from the query. It also gives us an iterator like way to determine if we have reached the end of the result and a close method that must be called to free resources:
sourcetrait Database {
// blocking query
def doQuery(): QueryResult
}
trait QueryResult {
def hasMore: Boolean
// potentially blocking retrieval of each element
def nextEntry(): DatabaseEntry
def close(): Unit
}
trait DatabaseEntry
sourceinterface Database {
// blocking query
QueryResult doQuery();
}
interface QueryResult {
boolean hasMore();
// potentially blocking retrieval of each element
DatabaseEntry nextEntry();
void close();
}
interface DatabaseEntry {}
Let’s see how we use the API above safely through unfoldResource
:
source// we don't actually have one, it was just made up for the sample
val database: Database = ???
val queryResultSource: Source[DatabaseEntry, NotUsed] =
Source.unfoldResource[DatabaseEntry, QueryResult](
// open
{ () =>
database.doQuery()
},
// read
{ query =>
if (query.hasMore)
Some(query.nextEntry())
else
// signals end of resource
None
},
// close
query => query.close())
// process each element
queryResultSource.runForeach(println)
source// we don't actually have one, it was just made up for the sample
Database database = null;
Source<DatabaseEntry, NotUsed> queryResultSource =
Source.unfoldResource(
// open
() -> database.doQuery(),
// read
(queryResult) -> {
if (queryResult.hasMore()) return Optional.of(queryResult.nextEntry());
else return Optional.empty();
},
// close
QueryResult::close);
queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system);
If the resource produces more than one element at a time, combining unfoldResource
with mapConcat(identity)
will give you a stream of individual elements. See mapConcat) for details.
Reactive Streams semantics¶
emits when there is demand and the read
function returns a value
completes when the read
function returns None