Flow.futureFlow
Streams the elements through the given future flow once it successfully completes.
Signature¶
Description¶
Streams the elements through the given future flow once it successfully completes. If the future fails the stream is failed.
Examples¶
A deferred creation of the stream based on the initial element can be achieved by combining futureFlow
with prefixAndTail
like so:
sourcedef processingFlow(id: Int): Future[Flow[Int, String, NotUsed]] =
Future {
Flow[Int].map(n => s"id: $id, value: $n")
}
val source: Source[String, NotUsed] =
Source(1 to 10).prefixAndTail(1).flatMapConcat {
case (List(id: Int), tail) =>
// base the Future flow creation on the first element
tail.via(Flow.futureFlow(processingFlow(id)))
}
Reactive Streams semantics¶
emits when the internal flow is successfully created and it emits
backpressures when the internal flow is successfully created and it backpressures
completes when upstream completes and all elements have been emitted from the internal flow
completes when upstream completes and all futures have been completed and all elements have been emitted
cancels when downstream cancels (keep reading) The operator’s default behavior in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behavior can be controlled by setting the org.apache.pekko.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested attribute, this will delay downstream cancellation until nested flow’s materialization which is then immediately cancelled (with the original cancellation cause).