watchTermination
Materializes to a Future
that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed.
Signature¶
Source.watchTermination
Flow.watchTermination
Description¶
Materializes to a Future
that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed. The operators otherwise passes through elements unchanged.
Examples¶
sourceSource(1 to 5)
.watchTermination()((prevMatValue, future) =>
// this function will be run when the stream terminates
// the Future provided as a second parameter indicates whether the stream completed successfully or failed
future.onComplete {
case Failure(exception) => println(exception.getMessage)
case Success(_) => println(s"The stream materialized $prevMatValue")
})
.runForeach(println)
/*
Prints:
1
2
3
4
5
The stream materialized NotUsed
*/
Source(1 to 5)
.watchTermination()((prevMatValue, future) =>
future.onComplete {
case Failure(exception) => println(exception.getMessage)
case Success(_) => println(s"The stream materialized $prevMatValue")
})
.runForeach(e => if (e == 3) throw new Exception("Boom") else println(e))
/*
Prints:
1
2
Boom
*/
sourceSource.range(1, 5)
.watchTermination(
(prevMatValue, completionStage) -> {
completionStage.whenComplete(
(done, exc) -> {
if (done != null)
System.out.println("The stream materialized " + prevMatValue.toString());
else System.out.println(exc.getMessage());
});
return prevMatValue;
})
.runForeach(System.out::println, system);
/*
Prints:
1
2
3
4
5
The stream materialized NotUsed
*/
Source.range(1, 5)
.watchTermination(
(prevMatValue, completionStage) -> {
// this function will be run when the stream terminates
// the CompletionStage provided as a second parameter indicates whether
// the stream completed successfully or failed
completionStage.whenComplete(
(done, exc) -> {
if (done != null)
System.out.println("The stream materialized " + prevMatValue.toString());
else System.out.println(exc.getMessage());
});
return prevMatValue;
})
.runForeach(
element -> {
if (element == 3) throw new Exception("Boom");
else System.out.println(element);
},
system);
/*
Prints:
1
2
Boom
*/
You can also use the lambda function expected by watchTermination
to map the materialized value of the stream. Additionally, the completion of the Future
provided as a second parameter of the lambda can be used to perform cleanup operations of the resources used by the stream itself.
Reactive Streams semantics¶
emits when input has an element available
backpressures when output backpressures
completes when upstream completes