Flow.completionStageFlow

Streams the elements through the given future flow once it successfully completes.

Simple operators

Signature

Flow.completionStageFlowFlow.completionStageFlow

Description

Streams the elements through the given flow once the CompletionStage successfully completes. If the future fails the stream fails.

Examples

A deferred creation of the stream based on the initial element by combining completionStageFlow with prefixAndTail like so:

Scala
sourceCompletionStage<Flow<Integer, String, NotUsed>> processingFlow(int id) {
  return CompletableFuture.completedFuture(
      Flow.of(Integer.class).map(n -> "id: " + id + " value: " + n));
}

  Source<String, NotUsed> source =
      Source.range(1, 10)
          .prefixAndTail(1)
          .flatMapConcat(
              (pair) -> {
                List<Integer> head = pair.first();
                Source<Integer, NotUsed> tail = pair.second();

                int id = head.get(0);

                return tail.via(Flow.completionStageFlow(processingFlow(id)));
              });

Reactive Streams semantics

emits when the internal flow is successfully created and it emits

backpressures when the internal flow is successfully created and it backpressures

completes when upstream completes and all elements have been emitted from the internal flow

completes when upstream completes and all futures have been completed and all elements have been emitted