watchTermination
Materializes to a Future
CompletionStage
that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed.
Signature
Source.watchTermination
Source.watchTermination
Flow.watchTermination
Flow.watchTermination
Description
Materializes to a Future
CompletionStage
that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed. The operators otherwise passes through elements unchanged.
Examples
- Scala
-
source
Source(1 to 5) .watchTermination()((prevMatValue, future) => // this function will be run when the stream terminates // the Future provided as a second parameter indicates whether the stream completed successfully or failed future.onComplete { case Failure(exception) => println(exception.getMessage) case Success(_) => println(s"The stream materialized $prevMatValue") }) .runForeach(println) /* Prints: 1 2 3 4 5 The stream materialized NotUsed */ Source(1 to 5) .watchTermination()((prevMatValue, future) => future.onComplete { case Failure(exception) => println(exception.getMessage) case Success(_) => println(s"The stream materialized $prevMatValue") }) .runForeach(e => if (e == 3) throw new Exception("Boom") else println(e)) /* Prints: 1 2 Boom */
- Java
-
source
Source.range(1, 5) .watchTermination( (prevMatValue, completionStage) -> { completionStage.whenComplete( (done, exc) -> { if (done != null) System.out.println("The stream materialized " + prevMatValue.toString()); else System.out.println(exc.getMessage()); }); return prevMatValue; }) .runForeach(System.out::println, system); /* Prints: 1 2 3 4 5 The stream materialized NotUsed */ Source.range(1, 5) .watchTermination( (prevMatValue, completionStage) -> { // this function will be run when the stream terminates // the CompletionStage provided as a second parameter indicates whether // the stream completed successfully or failed completionStage.whenComplete( (done, exc) -> { if (done != null) System.out.println("The stream materialized " + prevMatValue.toString()); else System.out.println(exc.getMessage()); }); return prevMatValue; }) .runForeach( element -> { if (element == 3) throw new Exception("Boom"); else System.out.println(element); }, system); /* Prints: 1 2 Boom */
You can also use the lambda function expected by watchTermination
to map the materialized value of the stream. Additionally, the completion of the Future
CompletionStage
provided as a second parameter of the lambda can be used to perform cleanup operations of the resources used by the stream itself.
Reactive Streams semantics
emits when input has an element available
backpressures when output backpressures
completes when upstream completes