monitor
Materializes to a FlowMonitor that monitors messages flowing through or completion of the operators.
Signature
Source.monitorSource.monitor Flow.monitorFlow.monitor
Description
Materializes to a FlowMonitor that monitors messages flowing through or completion of the stream. Elements pass through unchanged. Note that the FlowMonitor inserts a memory barrier every time it processes an event, and may therefore affect performance. The provided FlowMonitor contains a state field you can use to peek and get information about the stream.
Example
The example below uses the monitorMat variant of monitor. The only difference between the two operators is that monitorMat has a combine argument so we can decide which materialization value to keep. In the sample below be Keep.right so only the FlowMonitor[Int] is returned.
- Scala
-
source
val source: Source[Int, NotUsed] = Source.fromIterator(() => Iterator.from(0)) def printMonitorState(flowMonitor: FlowMonitor[Int]) = flowMonitor.state match { case FlowMonitorState.Initialized => println("Stream is initialized but hasn't processed any element") case FlowMonitorState.Received(msg) => println(s"Last element processed: $msg") case FlowMonitorState.Failed(cause) => println(s"Stream failed with cause $cause") case FlowMonitorState.Finished => println(s"Stream completed already") } val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right) val (flowMonitor, futureDone) = monitoredSource.toMat(Sink.foreach(println))(Keep.both).run() // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(flowMonitor) // Periodically check the monitor Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor)) - Java
-
source
private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) { if (state == FlowMonitorState.finished()) { System.out.println("Stream is initialized but hasn't processed any element"); } else if (state instanceof FlowMonitorState.Received) { FlowMonitorState.Received msg = (FlowMonitorState.Received) state; System.out.println("Last message received: " + msg.msg()); } else if (state instanceof FlowMonitorState.Failed) { Throwable cause = ((FlowMonitorState.Failed) state).cause(); System.out.println("Stream failed with cause: " + cause.getMessage()); } else { System.out.println("Stream completed already"); } } Source<Integer, FlowMonitor<Integer>> monitoredSource = Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator()) .throttle(5, Duration.ofSeconds(1)) .monitorMat(Keep.right()); Pair<FlowMonitor<Integer>, CompletionStage<Done>> run = monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem); FlowMonitor<Integer> monitor = run.first(); // If we peek in the monitor too early, it's possible it was not initialized yet. printMonitorState(monitor.state()); // Periodically check the monitor Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "") .runForeach(__ -> printMonitorState(monitor.state()), actorSystem);
When run, the sample code will produce something similar to:
Stream is initialized but hasn't processed any element
0
1
2
Last element processed: 2
3
4
5
Stream completed already
Reactive Streams semantics
emits when upstream emits an element
backpressures when downstream backpressures
completes when upstream completes