Source.maybe
Create a source that emits once the materialized Promise CompletableFuture is completed with a value.
Signature
Description
Create a source with a materialized Promise[Option[T]] CompletableFuture<Optional<T>> which controls what element will be emitted by the Source. This makes it possible to inject a value into a stream after creation.
- If the materialized promise is completed with a
Somenon-emptyOptional, that value will be produced downstream, followed by completion. - If the materialized promise is completed with a
NoneemptyOptional, no value will be produced downstream and completion will be signalled immediately. - If the materialized promise is completed with a failure, then the source will fail with that error.
- If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed with
NoneemptyOptional.
Source.maybe has some similarities with Source.fromFutureSource.fromCompletionStage. One difference is that a new PromiseCompletableFuture is materialized from Source.maybe each time the stream is run while the FutureCompletionStage given to Source.fromFutureSource.fromCompletionStage can only be completed once.
Source.queue is an alternative for emitting more than one element.
Example
- Scala
-
source
import org.apache.pekko.stream.scaladsl._ import scala.concurrent.Promise val source = Source.maybe[Int].to(Sink.foreach(elem => println(elem))) val promise1: Promise[Option[Int]] = source.run() promise1.success(Some(1)) // prints 1 // a new Promise is returned when the stream is materialized val promise2 = source.run() promise2.success(Some(2)) // prints 2 - Java
-
source
import org.apache.pekko.stream.javadsl.RunnableGraph; import java.util.concurrent.CompletableFuture; Source<Integer, CompletableFuture<Optional<Integer>>> source = Source.<Integer>maybe(); RunnableGraph<CompletableFuture<Optional<Integer>>> runnable = source.to(Sink.foreach(System.out::println)); CompletableFuture<Optional<Integer>> completable1 = runnable.run(system); completable1.complete(Optional.of(1)); // prints 1 CompletableFuture<Optional<Integer>> completable2 = runnable.run(system); completable2.complete(Optional.of(2)); // prints 2
The Source.maybe[Int] will return a Promise[Option[Int]]CompletableFuture<Optional<Integer>> materialized value. That PromiseCompletableFuture can be completed later. Each time the stream is run a new PromiseCompletableFuture is returned.
Reactive Streams semantics
emits when the returned promise is completed with some value
completes after emitting some value, or directly if the promise is completed with no value