Source.unfoldAsync

Just like unfold but the fold function returns a Future CompletionStage.

Source operators

Signature

Source.unfoldAsyncSource.unfoldAsync

Description

Just like unfold but the fold function returns a Future CompletionStage which will cause the source to complete or emit when it completes.

Can be used to implement many stateful sources without having to touch the more low level GraphStage API.

Examples

In this example we are asking an imaginary actor for chunks of bytes from an offset with a protocol like this:

Scala
sourceobject DataActor {
  sealed trait Command
  case class FetchChunk(offset: Long, replyTo: ActorRef[Chunk]) extends Command
  case class Chunk(bytes: ByteString)
Java
sourceclass DataActor {
  interface Command {}

  static final class FetchChunk implements Command {
    public final long offset;
    public final ActorRef<Chunk> replyTo;

    public FetchChunk(long offset, ActorRef<Chunk> replyTo) {
      this.offset = offset;
      this.replyTo = replyTo;
    }
  }

  static final class Chunk {
    public final ByteString bytes;

    public Chunk(ByteString bytes) {
      this.bytes = bytes;
    }
  }

The actor will reply with the Chunk message, if we ask for an offset outside of the end of the data the actor will respond with an empty ByteString

We want to represent this as a stream of ByteStrings that complete when we reach the end, to achieve this we use the offset as the state passed between unfoldAsync invocations:

Scala
source// actor we can query for data with an offset
val dataActor: ActorRef[DataActor.Command] = ???
import system.executionContext

implicit val askTimeout: Timeout = 3.seconds
val startOffset = 0L
val byteSource: Source[ByteString, NotUsed] =
  Source.unfoldAsync(startOffset) { currentOffset =>
    // ask for next chunk
    val nextChunkFuture: Future[DataActor.Chunk] =
      dataActor.ask(DataActor.FetchChunk(currentOffset, _))

    nextChunkFuture.map { chunk =>
      val bytes = chunk.bytes
      if (bytes.isEmpty) None // end of data
      else Some((currentOffset + bytes.length, bytes))
    }
  }
Java
sourceActorRef<DataActor.Command> dataActor = null; // let's say we got it from somewhere

Duration askTimeout = Duration.ofSeconds(3);
long startOffset = 0L;
Source<ByteString, NotUsed> byteSource =
    Source.unfoldAsync(
        startOffset,
        currentOffset -> {
          // ask for next chunk
          CompletionStage<DataActor.Chunk> nextChunkCS =
              AskPattern.ask(
                  dataActor,
                  (ActorRef<DataActor.Chunk> ref) ->
                      new DataActor.FetchChunk(currentOffset, ref),
                  askTimeout,
                  system.scheduler());

          return nextChunkCS.thenApply(
              chunk -> {
                ByteString bytes = chunk.bytes;
                if (bytes.isEmpty()) return Optional.empty();
                else return Optional.of(Pair.create(currentOffset + bytes.size(), bytes));
              });
        });

Reactive Streams semantics

emits when there is demand and unfold state returned future completes with some value

completes when the future CompletionStage returned by the unfold function completes with an empty value