Flow.futureFlow
Streams the elements through the given future flow once it successfully completes.
Signature
Flow.futureFlow
Flow.futureFlow
Description
Streams the elements through the given future flow once it successfully completes. If the future fails the stream is failed.
Examples
A deferred creation of the stream based on the initial element can be achieved by combining futureFlow
with prefixAndTail
like so:
- Scala
-
source
def processingFlow(id: Int): Future[Flow[Int, String, NotUsed]] = Future { Flow[Int].map(n => s"id: $id, value: $n") } val source: Source[String, NotUsed] = Source(1 to 10).prefixAndTail(1).flatMapConcat { case (List(id: Int), tail) => // base the Future flow creation on the first element tail.via(Flow.futureFlow(processingFlow(id))) }
Reactive Streams semantics
emits when the internal flow is successfully created and it emits
backpressures when the internal flow is successfully created and it backpressures
completes when upstream completes and all elements have been emitted from the internal flow
completes when upstream completes and all futures have been completed and all elements have been emitted
cancels when downstream cancels (keep reading) The operator’s default behavior in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behavior can be controlled by setting the org.apache.pekko.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested attribute, this will delay downstream cancellation until nested flow’s materialization which is then immediately cancelled (with the original cancellation cause).