Source.maybe

Create a source that emits once the materialized Promise CompletableFuture is completed with a value.

Source operators

Signature

Source.maybeSource.maybe

Description

Create a source with a materialized Promise[Option[T]] CompletableFuture<Optional<T>> which controls what element will be emitted by the Source. This makes it possible to inject a value into a stream after creation.

  • If the materialized promise is completed with a Somenon-empty Optional, that value will be produced downstream, followed by completion.
  • If the materialized promise is completed with a Noneempty Optional, no value will be produced downstream and completion will be signalled immediately.
  • If the materialized promise is completed with a failure, then the source will fail with that error.
  • If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed with Noneempty Optional.

Source.maybe has some similarities with Source.fromFutureSource.fromCompletionStage. One difference is that a new PromiseCompletableFuture is materialized from Source.maybe each time the stream is run while the FutureCompletionStage given to Source.fromFutureSource.fromCompletionStage can only be completed once.

Source.queue is an alternative for emitting more than one element.

Example

Scala
sourceimport org.apache.pekko.stream.scaladsl._
import scala.concurrent.Promise

val source = Source.maybe[Int].to(Sink.foreach(elem => println(elem)))

val promise1: Promise[Option[Int]] = source.run()
promise1.success(Some(1)) // prints 1

// a new Promise is returned when the stream is materialized
val promise2 = source.run()
promise2.success(Some(2)) // prints 2
Java
sourceimport org.apache.pekko.stream.javadsl.RunnableGraph;
import java.util.concurrent.CompletableFuture;
Source<Integer, CompletableFuture<Optional<Integer>>> source = Source.<Integer>maybe();
RunnableGraph<CompletableFuture<Optional<Integer>>> runnable =
    source.to(Sink.foreach(System.out::println));

CompletableFuture<Optional<Integer>> completable1 = runnable.run(system);
completable1.complete(Optional.of(1)); // prints 1

CompletableFuture<Optional<Integer>> completable2 = runnable.run(system);
completable2.complete(Optional.of(2)); // prints 2

The Source.maybe[Int] will return a Promise[Option[Int]]CompletableFuture<Optional<Integer>> materialized value. That PromiseCompletableFuture can be completed later. Each time the stream is run a new PromiseCompletableFuture is returned.

Reactive Streams semantics

emits when the returned promise is completed with some value

completes after emitting some value, or directly if the promise is completed with no value