watchTermination

Materializes to a Future CompletionStage that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed.

Watching status operators

Signature

Source.watchTerminationSource.watchTermination Flow.watchTerminationFlow.watchTermination

Description

Materializes to a Future CompletionStage that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed. The operators otherwise passes through elements unchanged.

Examples

Scala
sourceSource(1 to 5)
  .watchTermination()((prevMatValue, future) =>
    // this function will be run when the stream terminates
    // the Future provided as a second parameter indicates whether the stream completed successfully or failed
    future.onComplete {
      case Failure(exception) => println(exception.getMessage)
      case Success(_)         => println(s"The stream materialized $prevMatValue")
    })
  .runForeach(println)
/*
Prints:
1
2
3
4
5
The stream materialized NotUsed
 */

Source(1 to 5)
  .watchTermination()((prevMatValue, future) =>
    future.onComplete {
      case Failure(exception) => println(exception.getMessage)
      case Success(_)         => println(s"The stream materialized $prevMatValue")
    })
  .runForeach(e => if (e == 3) throw new Exception("Boom") else println(e))
/*
Prints:
1
2
Boom
 */
Java
sourceSource.range(1, 5)
    .watchTermination(
        (prevMatValue, completionStage) -> {
          completionStage.whenComplete(
              (done, exc) -> {
                if (done != null)
                  System.out.println("The stream materialized " + prevMatValue.toString());
                else System.out.println(exc.getMessage());
              });
          return prevMatValue;
        })
    .runForeach(System.out::println, system);

/*
Prints:
1
2
3
4
5
The stream materialized NotUsed
 */

Source.range(1, 5)
    .watchTermination(
        (prevMatValue, completionStage) -> {
          // this function will be run when the stream terminates
          // the CompletionStage provided as a second parameter indicates whether
          // the stream completed successfully or failed
          completionStage.whenComplete(
              (done, exc) -> {
                if (done != null)
                  System.out.println("The stream materialized " + prevMatValue.toString());
                else System.out.println(exc.getMessage());
              });
          return prevMatValue;
        })
    .runForeach(
        element -> {
          if (element == 3) throw new Exception("Boom");
          else System.out.println(element);
        },
        system);
/*
Prints:
1
2
Boom
 */

You can also use the lambda function expected by watchTermination to map the materialized value of the stream. Additionally, the completion of the FutureCompletionStage provided as a second parameter of the lambda can be used to perform cleanup operations of the resources used by the stream itself.

Reactive Streams semantics

emits when input has an element available

backpressures when output backpressures

completes when upstream completes