Source.lazySource

Defers creation and materialization of a Source until there is demand.

Source operators

Signature

Source.lazySourceSource.lazySource

Description

Defers creation and materialization of a Source until there is demand, then emits the elements from the source downstream just like if it had been created up front. If the stream fails or cancels before there is demand the factory will not be invoked.

Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand earlier than you would expect.

The materialized value of the lazy is a FutureCompletionStage that is completed with the materialized value of the nested source once that is constructed.

See also:

Example

In this example you might expect this sample to not construct the expensive source until .pull is called. However, since Sink.queue has a buffer and will ask for that immediately on materialization the expensive source is in created quickly after the stream has been materialized:

Scala
sourceval source = Source.lazySource { () =>
  println("Creating the actual source")
  createExpensiveSource()
}

val queue = source.runWith(Sink.queue())

// ... time passes ...
// at some point in time we pull the first time
// but the source creation may already have been triggered
queue.pull()
Java
sourceSource<String, CompletionStage<NotUsed>> source =
    Source.lazySource(
        () -> {
          System.out.println("Creating the actual source");
          return createExpensiveSource();
        });

SinkQueueWithCancel<String> queue = source.runWith(Sink.queue(), system);

// ... time passes ...
// at some point in time we pull the first time
// but the source creation may already have been triggered
queue.pull();

Instead the most useful aspect of the operator is that the factory is called once per stream materialization which means that it can be used to safely construct a mutable object to use with the actual deferred source.

In this example we make use of that by unfolding a mutable object that works like an iterator with a method to say if there are more elements and one that produces the next and moves to the next element.

If the IteratorLikeThing was used directly in a Source.unfold the same instance would end up being unsafely shared across all three materializations of the stream, but wrapping it with Source.lazy ensures we create a separate instance for each of the started streams:

Scala
sourceval stream = Source
  .lazySource { () =>
    val iteratorLike = new IteratorLikeThing
    Source.unfold(iteratorLike) { iteratorLike =>
      if (iteratorLike.thereAreMore) Some((iteratorLike, iteratorLike.extractNext))
      else None
    }
  }
  .to(Sink.foreach(println))

// each of the three materializations will have their own instance of IteratorLikeThing
stream.run()
stream.run()
stream.run()
Java
sourceRunnableGraph<CompletionStage<NotUsed>> stream =
    Source.lazySource(
            () -> {
              IteratorLikeThing instance = new IteratorLikeThing();
              return Source.unfold(
                  instance,
                  sameInstance -> {
                    if (sameInstance.thereAreMore())
                      return Optional.of(Pair.create(sameInstance, sameInstance.extractNext()));
                    else return Optional.empty();
                  });
            })
        .to(Sink.foreach(System.out::println));

// each of the three materializations will have their own instance of IteratorLikeThing
stream.run(system);
stream.run(system);
stream.run(system);

Note though that you can often also achieve the same using unfoldResource. If you have an actual Iterator you should prefer fromIterator.

Reactive Streams semantics

emits depends on the wrapped Source

completes depends on the wrapped Source