Flow.lazyFlow

Defers creation and materialization of a Flow until there is a first element.

Simple operators

Signature

Flow.lazyFlowFlow.lazyFlow

Description

Defers Flow creation and materialization until when the first element arrives at the lazyFlow from upstream. After that the stream behaves as if the nested flow replaced the lazyFlow. The nested Flow will not be created if the outer flow completes or fails before any elements arrive.

Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand and thereby making an early element come throught the stream leading to creation of the inner flow earlier than you would expect.

The materialized value of the Flow is a FutureCompletionStage that is completed with the materialized value of the nested flow once that is constructed.

See also:

Examples

In this sample we produce a short sequence of numbers, mostly to side effect and write to standard out to see in which order things happen. Note how producing the first value in the Source happens before the creation of the flow:

Scala
sourceval numbers = Source
  .unfold(0) { n =>
    val next = n + 1
    println(s"Source producing $next")
    Some((next, next))
  }
  .take(3)

val flow = Flow.lazyFlow { () =>
  println("Creating the actual flow")
  Flow[Int].map { element =>
    println(s"Actual flow mapped $element")
    element
  }
}

numbers.via(flow).run()
// prints:
// Source producing 1
// Creating the actual flow
// Actual flow mapped 1
// Source producing 2
// Actual flow mapped 2
Java
sourceSource<Integer, NotUsed> numbers =
    Source.unfold(
            0,
            n -> {
              int next = n + 1;
              System.out.println("Source producing " + next);
              return Optional.of(Pair.create(next, next));
            })
        .take(3);

Flow<Integer, Integer, CompletionStage<NotUsed>> flow =
    Flow.lazyFlow(
        () -> {
          System.out.println("Creating the actual flow");
          return Flow.fromFunction(
              element -> {
                System.out.println("Actual flow mapped " + element);
                return element;
              });
        });

numbers.via(flow).run(system);
// prints:
// Source producing 1
// Creating the actual flow
// Actual flow mapped 1
// Source producing 2
// Actual flow mapped 2

Since the factory is called once per stream materialization it can be used to safely construct a mutable object to use with the actual deferred Flow. In this example we fold elements into an ArrayList created inside the lazy flow factory:

Scala
sourceval mutableFold = Flow.lazyFlow { () =>
  val zero = new util.ArrayList[Int]()
  Flow[Int].fold(zero) { (list, element) =>
    list.add(element)
    list
  }
}
val stream =
  Source(1 to 3).via(mutableFold).to(Sink.foreach(println))

stream.run()
stream.run()
stream.run()
// prints:
// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
Java
sourceFlow<Integer, List<Integer>, CompletionStage<NotUsed>> mutableFold =
    Flow.lazyFlow(
        () -> {
          List<Integer> zero = new ArrayList<>();

          return Flow.of(Integer.class)
              .fold(
                  zero,
                  (list, element) -> {
                    list.add(element);
                    return list;
                  });
        });

RunnableGraph<NotUsed> stream =
    Source.range(1, 3).via(mutableFold).to(Sink.foreach(System.out::println));

stream.run(system);
stream.run(system);
stream.run(system);
// prints:
// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]

If we instead had used fold directly with an ArrayList we would have shared the same list across all materialization and what is even worse, unsafely across threads.

Reactive Streams semantics

emits when the internal flow is successfully created and it emits

backpressures when the internal flow is successfully created and it backpressures

completes when upstream completes and all elements have been emitted from the internal flow

completes when upstream completes and all futures have been completed and all elements have been emitted

cancels when downstream cancels (keep reading) The operator’s default behavior in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behavior can be controlled by setting the org.apache.pekko.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested attribute, this will delay downstream cancellation until nested flow’s materialization which is then immediately cancelled (with the original cancellation cause).