Flow.lazyFlow
Defers creation and materialization of a Flow
until there is a first element.
Signature
Description
Defers Flow
creation and materialization until when the first element arrives at the lazyFlow
from upstream. After that the stream behaves as if the nested flow replaced the lazyFlow
. The nested Flow
will not be created if the outer flow completes or fails before any elements arrive.
Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand and thereby making an early element come throught the stream leading to creation of the inner flow earlier than you would expect.
The materialized value of the Flow
is a Future
CompletionStage
that is completed with the materialized value of the nested flow once that is constructed.
See also:
Examples
In this sample we produce a short sequence of numbers, mostly to side effect and write to standard out to see in which order things happen. Note how producing the first value in the Source
happens before the creation of the flow:
- Scala
-
source
val numbers = Source .unfold(0) { n => val next = n + 1 println(s"Source producing $next") Some((next, next)) } .take(3) val flow = Flow.lazyFlow { () => println("Creating the actual flow") Flow[Int].map { element => println(s"Actual flow mapped $element") element } } numbers.via(flow).run() // prints: // Source producing 1 // Creating the actual flow // Actual flow mapped 1 // Source producing 2 // Actual flow mapped 2
- Java
-
source
Source<Integer, NotUsed> numbers = Source.unfold( 0, n -> { int next = n + 1; System.out.println("Source producing " + next); return Optional.of(Pair.create(next, next)); }) .take(3); Flow<Integer, Integer, CompletionStage<NotUsed>> flow = Flow.lazyFlow( () -> { System.out.println("Creating the actual flow"); return Flow.fromFunction( element -> { System.out.println("Actual flow mapped " + element); return element; }); }); numbers.via(flow).run(system); // prints: // Source producing 1 // Creating the actual flow // Actual flow mapped 1 // Source producing 2 // Actual flow mapped 2
Since the factory is called once per stream materialization it can be used to safely construct a mutable object to use with the actual deferred Flow
. In this example we fold elements into an ArrayList
created inside the lazy flow factory:
- Scala
-
source
val mutableFold = Flow.lazyFlow { () => val zero = new util.ArrayList[Int]() Flow[Int].fold(zero) { (list, element) => list.add(element) list } } val stream = Source(1 to 3).via(mutableFold).to(Sink.foreach(println)) stream.run() stream.run() stream.run() // prints: // [1, 2, 3] // [1, 2, 3] // [1, 2, 3]
- Java
-
source
Flow<Integer, List<Integer>, CompletionStage<NotUsed>> mutableFold = Flow.lazyFlow( () -> { List<Integer> zero = new ArrayList<>(); return Flow.of(Integer.class) .fold( zero, (list, element) -> { list.add(element); return list; }); }); RunnableGraph<NotUsed> stream = Source.range(1, 3).via(mutableFold).to(Sink.foreach(System.out::println)); stream.run(system); stream.run(system); stream.run(system); // prints: // [1, 2, 3] // [1, 2, 3] // [1, 2, 3]
If we instead had used fold
directly with an ArrayList
we would have shared the same list across all materialization and what is even worse, unsafely across threads.
Reactive Streams semantics
emits when the internal flow is successfully created and it emits
backpressures when the internal flow is successfully created and it backpressures
completes when upstream completes and all elements have been emitted from the internal flow
completes when upstream completes and all futures have been completed and all elements have been emitted
cancels when downstream cancels (keep reading) The operator’s default behavior in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behavior can be controlled by setting the org.apache.pekko.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested attribute, this will delay downstream cancellation until nested flow’s materialization which is then immediately cancelled (with the original cancellation cause).