Source.unfoldAsync
Just like unfold
but the fold function returns a Future
.
Signature¶
Description¶
Just like unfold
but the fold function returns a Future
which will cause the source to complete or emit when it completes.
Can be used to implement many stateful sources without having to touch the more low level GraphStage
API.
Examples¶
In this example we are asking an imaginary actor for chunks of bytes from an offset with a protocol like this:
sourceobject DataActor {
sealed trait Command
case class FetchChunk(offset: Long, replyTo: ActorRef[Chunk]) extends Command
case class Chunk(bytes: ByteString)
sourceclass DataActor {
interface Command {}
static final class FetchChunk implements Command {
public final long offset;
public final ActorRef<Chunk> replyTo;
public FetchChunk(long offset, ActorRef<Chunk> replyTo) {
this.offset = offset;
this.replyTo = replyTo;
}
}
static final class Chunk {
public final ByteString bytes;
public Chunk(ByteString bytes) {
this.bytes = bytes;
}
}
The actor will reply with the Chunk
message, if we ask for an offset outside of the end of the data the actor will respond with an empty ByteString
We want to represent this as a stream of ByteString
s that complete when we reach the end, to achieve this we use the offset as the state passed between unfoldAsync
invocations:
source// actor we can query for data with an offset
val dataActor: ActorRef[DataActor.Command] = ???
import system.executionContext
implicit val askTimeout: Timeout = 3.seconds
val startOffset = 0L
val byteSource: Source[ByteString, NotUsed] =
Source.unfoldAsync(startOffset) { currentOffset =>
// ask for next chunk
val nextChunkFuture: Future[DataActor.Chunk] =
dataActor.ask(DataActor.FetchChunk(currentOffset, _))
nextChunkFuture.map { chunk =>
val bytes = chunk.bytes
if (bytes.isEmpty) None // end of data
else Some((currentOffset + bytes.length, bytes))
}
}
sourceActorRef<DataActor.Command> dataActor = null; // let's say we got it from somewhere
Duration askTimeout = Duration.ofSeconds(3);
long startOffset = 0L;
Source<ByteString, NotUsed> byteSource =
Source.unfoldAsync(
startOffset,
currentOffset -> {
// ask for next chunk
CompletionStage<DataActor.Chunk> nextChunkCS =
AskPattern.ask(
dataActor,
(ActorRef<DataActor.Chunk> ref) ->
new DataActor.FetchChunk(currentOffset, ref),
askTimeout,
system.scheduler());
return nextChunkCS.thenApply(
chunk -> {
ByteString bytes = chunk.bytes;
if (bytes.isEmpty()) return Optional.empty();
else return Optional.of(Pair.create(currentOffset + bytes.size(), bytes));
});
});
Reactive Streams semantics¶
emits when there is demand and unfold state returned future completes with some value
completes when the future returned by the unfold function completes with an empty value