Flow.futureFlow

Streams the elements through the given future flow once it successfully completes.

Simple operators

Signature

Flow.futureFlowFlow.futureFlow

Description

Streams the elements through the given future flow once it successfully completes. If the future fails the stream is failed.

Examples

A deferred creation of the stream based on the initial element can be achieved by combining futureFlow with prefixAndTail like so:

Scala
sourcedef processingFlow(id: Int): Future[Flow[Int, String, NotUsed]] =
  Future {
    Flow[Int].map(n => s"id: $id, value: $n")
  }

val source: Source[String, NotUsed] =
  Source(1 to 10).prefixAndTail(1).flatMapConcat {
    case (List(id: Int), tail) =>
      // base the Future flow creation on the first element
      tail.via(Flow.futureFlow(processingFlow(id)))
  }

Reactive Streams semantics

emits when the internal flow is successfully created and it emits

backpressures when the internal flow is successfully created and it backpressures

completes when upstream completes and all elements have been emitted from the internal flow

completes when upstream completes and all futures have been completed and all elements have been emitted

cancels when downstream cancels (keep reading) The operator’s default behavior in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behavior can be controlled by setting the org.apache.pekko.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested attribute, this will delay downstream cancellation until nested flow’s materialization which is then immediately cancelled (with the original cancellation cause).