Source.maybe
Create a source that emits once the materialized Promise
is completed with a value.
Signature¶
Description¶
Create a source with a materialized Promise[Option[T]]
which controls what element will be emitted by the Source. This makes it possible to inject a value into a stream after creation.
- If the materialized promise is completed with a
Some
, that value will be produced downstream, followed by completion. - If the materialized promise is completed with a
None
, no value will be produced downstream and completion will be signalled immediately. - If the materialized promise is completed with a failure, then the source will fail with that error.
- If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed with
None
.
Source.maybe
has some similarities with Source.fromFuture
. One difference is that a new Promise
is materialized from Source.maybe
each time the stream is run while the Future
given to Source.fromFuture
can only be completed once.
Source.queue
is an alternative for emitting more than one element.
Example¶
sourceimport org.apache.pekko.stream.scaladsl._
import scala.concurrent.Promise
val source = Source.maybe[Int].to(Sink.foreach(elem => println(elem)))
val promise1: Promise[Option[Int]] = source.run()
promise1.success(Some(1)) // prints 1
// a new Promise is returned when the stream is materialized
val promise2 = source.run()
promise2.success(Some(2)) // prints 2
sourceimport org.apache.pekko.stream.javadsl.RunnableGraph;
import java.util.concurrent.CompletableFuture;
Source<Integer, CompletableFuture<Optional<Integer>>> source = Source.<Integer>maybe();
RunnableGraph<CompletableFuture<Optional<Integer>>> runnable =
source.to(Sink.foreach(System.out::println));
CompletableFuture<Optional<Integer>> completable1 = runnable.run(system);
completable1.complete(Optional.of(1)); // prints 1
CompletableFuture<Optional<Integer>> completable2 = runnable.run(system);
completable2.complete(Optional.of(2)); // prints 2
The Source.maybe[Int]
will return a Promise[Option[Int]]
materialized value. That Promise
can be completed later. Each time the stream is run a new Promise
is returned.
Reactive Streams semantics¶
emits when the returned promise is completed with some value
completes after emitting some value, or directly if the promise is completed with no value