monitor
Materializes to a FlowMonitor
that monitors messages flowing through or completion of the operators.
Signature¶
Description¶
Materializes to a FlowMonitor
that monitors messages flowing through or completion of the stream. Elements pass through unchanged. Note that the FlowMonitor
inserts a memory barrier every time it processes an event, and may therefore affect performance. The provided FlowMonitor
contains a state
field you can use to peek and get information about the stream.
Example¶
The example below uses the monitorMat
variant of monitor
. The only difference between the two operators is that monitorMat
has a combine
argument so we can decide which materialization value to keep. In the sample below be Keep.right
so only the FlowMonitor[Int]
is returned.
sourceval source: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.from(0))
def printMonitorState(flowMonitor: FlowMonitor[Int]) =
flowMonitor.state match {
case FlowMonitorState.Initialized =>
println("Stream is initialized but hasn't processed any element")
case FlowMonitorState.Received(msg) =>
println(s"Last element processed: $msg")
case FlowMonitorState.Failed(cause) =>
println(s"Stream failed with cause $cause")
case FlowMonitorState.Finished => println(s"Stream completed already")
}
val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right)
val (flowMonitor, futureDone) =
monitoredSource.toMat(Sink.foreach(println))(Keep.both).run()
// If we peek in the monitor too early, it's possible it was not initialized yet.
printMonitorState(flowMonitor)
// Periodically check the monitor
Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor))
sourceprivate static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
if (state == FlowMonitorState.finished()) {
System.out.println("Stream is initialized but hasn't processed any element");
} else if (state instanceof FlowMonitorState.Received) {
FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
System.out.println("Last message received: " + msg.msg());
} else if (state instanceof FlowMonitorState.Failed) {
Throwable cause = ((FlowMonitorState.Failed) state).cause();
System.out.println("Stream failed with cause: " + cause.getMessage());
} else {
System.out.println("Stream completed already");
}
}
Source<Integer, FlowMonitor<Integer>> monitoredSource =
Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
.throttle(5, Duration.ofSeconds(1))
.monitorMat(Keep.right());
Pair<FlowMonitor<Integer>, CompletionStage<Done>> run =
monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem);
FlowMonitor<Integer> monitor = run.first();
// If we peek in the monitor too early, it's possible it was not initialized yet.
printMonitorState(monitor.state());
// Periodically check the monitor
Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "")
.runForeach(__ -> printMonitorState(monitor.state()), actorSystem);
When run, the sample code will produce something similar to:
Stream is initialized but hasn't processed any element
0
1
2
Last element processed: 2
3
4
5
Stream completed already
Reactive Streams semantics¶
emits when upstream emits an element
backpressures when downstream backpressures
completes when upstream completes