Error Handling in Streams
Dependency
To use Pekko Streams, add the module to your project:
- sbt
val PekkoVersion = "1.0.3" libraryDependencies += "org.apache.pekko" %% "pekko-stream" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.3") implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}" }
Introduction
When an operator in a stream fails this will normally lead to the entire stream being torn down. Each of the operators downstream gets informed about the failure and each upstream operator sees a cancellation.
In many cases you may want to avoid complete stream failure, this can be done in a few different ways:
recover
recover
to emit a final element then complete the stream normally on upstream failurerecoverWithRetries
recoverWithRetries
to create a new upstream and start consuming from that on failure- Restarting sections of the stream after a backoff
- Using a supervision strategy for operators that support it
In addition to these built in tools for error handling, a common pattern is to wrap the stream inside an actor, and have the actor restart the entire stream on failure.
Logging errors
log()
log()
enables logging of a stream, which is typically useful for error logging. The below stream fails with ArithmeticException
when the element 0
goes through the map
map
operator,
- Scala
-
source
Source(-5 to 5) .map(1 / _) // throwing ArithmeticException: / by zero .log("error logging") .runWith(Sink.ignore)
- Java
-
source
Source.from(Arrays.asList(-1, 0, 1)) .map(x -> 1 / x) // throwing ArithmeticException: / by zero .log("error logging") .runWith(Sink.ignore(), system);
and error messages like below will be logged.
[error logging] Upstream failed.
java.lang.ArithmeticException: / by zero
If you want to control logging levels on each element, completion, and failure, you can find more details in Logging in streams.
Recover
recover
recover
allows you to emit a final element and then complete the stream on an upstream failure. Deciding which exceptions should be recovered is done through a PartialFunction
. If an exception does not have a matching case match defined the stream is failed.
Recovering can be useful if you want to gracefully complete a stream on failure while letting downstream know that there was a failure.
Throwing an exception inside recover
will be logged on ERROR level automatically.
More details in recover
- Scala
-
source
Source(0 to 6) .map(n => // assuming `4` and `5` are unexpected values that could throw exception if (List(4, 5).contains(n)) throw new RuntimeException(s"Boom! Bad value found: $n") else n.toString) .recover { case e: RuntimeException => e.getMessage } .runForeach(println)
- Java
-
source
Source.from(Arrays.asList(0, 1, 2, 3, 4, 5, 6)) .map( n -> { // assuming `4` and `5` are unexpected values that could throw exception if (Arrays.asList(4, 5).contains(n)) throw new RuntimeException(String.format("Boom! Bad value found: %s", n)); else return n.toString(); }) .recover( new PFBuilder<Throwable, String>() .match(RuntimeException.class, Throwable::getMessage) .build()) .runForeach(System.out::println, system);
This will output:
- Scala
-
source
0 1 2 3 // last element before failure Boom! Bad value found: 4 // first element on failure
- Java
-
source
0 1 2 3 // last element before failure Boom! Bad value found: 4 // first element on failure
Recover with retries
recoverWithRetries
recoverWithRetries
allows you to put a new upstream in place of the failed one, recovering stream failures up to a specified maximum number of times.
Deciding which exceptions should be recovered is done through a PartialFunction
. If an exception does not have a matching case match defined the stream is failed.
- Scala
-
source
val planB = Source(List("five", "six", "seven", "eight")) Source(0 to 10) .map(n => if (n < 5) n.toString else throw new RuntimeException("Boom!")) .recoverWithRetries(attempts = 1, { case _: RuntimeException => planB }) .runForeach(println)
- Java
-
source
Source<String, NotUsed> planB = Source.from(Arrays.asList("five", "six", "seven", "eight")); Source.from(Arrays.asList(0, 1, 2, 3, 4, 5, 6)) .map( n -> { if (n < 5) return n.toString(); else throw new RuntimeException("Boom!"); }) .recoverWithRetries( 1, // max attempts new PFBuilder<Throwable, Source<String, NotUsed>>() .match(RuntimeException.class, ex -> planB) .build()) .runForeach(System.out::println, system);
This will output:
Delayed restarts with a backoff operator
Pekko streams provides a RestartSource
RestartSource
, RestartSink
RestartSink
and RestartFlow
RestartFlow
for implementing the so-called exponential backoff supervision strategy, starting an operator again when it fails or completes, each time with a growing time delay between restarts.
This pattern is useful when the operator fails or completes because some external resource is not available and we need to give it some time to start-up again. One of the prime examples when this is useful is when a WebSocket connection fails due to the HTTP server it’s running on going down, perhaps because it is overloaded. By using an exponential backoff, we avoid going into a tight reconnect loop, which both gives the HTTP server some time to recover, and it avoids using needless resources on the client side.
The various restart shapes mentioned all expect an RestartSettings
RestartSettings
which configures the restart behavior. Configurable parameters are:
minBackoff
is the initial duration until the underlying stream is restartedmaxBackoff
caps the exponential backoffrandomFactor
allows addition of a random delay following backoff calculationmaxRestarts
caps the total number of restartsmaxRestartsWithin
sets a timeframe during which restarts are counted towards the same total formaxRestarts
The following snippet shows how to create a backoff supervisor using RestartSource
RestartSource
which will supervise the given Source
Source
. The Source
in this case is a stream of Server Sent Events, produced by pekko-http. If the stream fails or completes at any point, the request will be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due to the maxBackoff
parameter):
- Scala
-
source
val settings = RestartSettings( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly ).withMaxRestarts(20, 5.minutes) // limits the amount of restarts to 20 within 5 minutes val restartSource = RestartSource.withBackoff(settings) { () => // Create a source from a future of a source Source.futureSource { // Make a single request with pekko-http Http() .singleRequest(HttpRequest(uri = "http://example.com/eventstream")) // Unmarshall it as a source of server sent events .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) } }
- Java
-
source
RestartSettings settings = RestartSettings.create( Duration.ofSeconds(3), // min backoff Duration.ofSeconds(30), // max backoff 0.2 // adds 20% "noise" to vary the intervals slightly ) .withMaxRestarts( 20, Duration.ofMinutes(5)); // limits the amount of restarts to 20 within 5 minutes Source<ServerSentEvent, NotUsed> eventStream = RestartSource.withBackoff( settings, () -> // Create a source from a future of a source Source.completionStageSource( // Issue a GET request on the event stream Http.get(system) .singleRequest(HttpRequest.create("http://example.com/eventstream")) .thenCompose( response -> // Unmarshall it to a stream of ServerSentEvents EventStreamUnmarshalling.fromEventStream() .unmarshall(response, materializer))));
Using a randomFactor
to add a little bit of additional variance to the backoff intervals is highly recommended, in order to avoid multiple streams re-start at the exact same point in time, for example because they were stopped due to a shared resource such as the same server going down and re-starting after the same configured interval. By adding additional randomness to the re-start intervals the streams will start in slightly different points in time, thus avoiding large spikes of traffic hitting the recovering server or other resource that they all need to contact.
The above RestartSource
will never terminate unless the Sink
Sink
it’s fed into cancels. It will often be handy to use it in combination with a KillSwitch
, so that you can terminate it when needed:
- Scala
-
source
val killSwitch = restartSource .viaMat(KillSwitches.single)(Keep.right) .toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left) .run() doSomethingElse() killSwitch.shutdown()
- Java
-
source
KillSwitch killSwitch = eventStream .viaMat(KillSwitches.single(), Keep.right()) .toMat(Sink.foreach(event -> System.out.println("Got event: " + event)), Keep.left()) .run(materializer); doSomethingElse(); killSwitch.shutdown();
Sinks and flows can also be supervised, using RestartSink
RestartSink
and RestartFlow
RestartFlow
. The RestartSink
is restarted when it cancels, while the RestartFlow
is restarted when either the in port cancels, the out port completes, or the out port sends an error.
Care should be taken when using GraphStage
s that conditionally propagate termination signals inside a RestartSource
RestartSource
, RestartSink
RestartSink
or RestartFlow
RestartFlow
.
An example is a Broadcast
operator with the default eagerCancel = false
where some of the outlets are for side-effecting branches (that do not re-join e.g. via a Merge
). A failure on a side branch will not terminate the supervised stream which will not be restarted. Conversely, a failure on the main branch can trigger a restart but leave behind old running instances of side branches.
In this example eagerCancel
should probably be set to true
, or, when only a single side branch is used, alsoTo
or divertTo
should be considered as alternatives.
Supervision Strategies
The operators that support supervision strategies are explicitly documented to do so, if there is nothing in the documentation of an operator saying that it adheres to the supervision strategy it means it fails rather than applies supervision.
The error handling strategies are inspired by actor supervision strategies, but the semantics have been adapted to the domain of stream processing. The most important difference is that supervision is not automatically applied to stream operators but instead something that each operator has to implement explicitly.
For many operators it may not even make sense to implement support for supervision strategies, this is especially true for operators connecting to external technologies where for example a failed connection will likely still fail if a new connection is tried immediately (see Restart with back off for such scenarios).
For operators that do implement supervision, the strategies for how to handle exceptions from processing stream elements can be selected when materializing the stream through use of an attribute.
There are three ways to handle exceptions from application code:
Stop
Supervision.stop()
- The stream is completed with failure.Resume
Supervision.resume()
- The element is dropped and the stream continues.Restart
Supervision.restart()
- The element is dropped and the stream continues after restarting the operator. Restarting an operator means that any accumulated state is cleared. This is typically performed by creating a new instance of the operator.
By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with failure when an exception is thrown.
- Scala
-
source
val source = Source(0 to 5).map(100 / _) val result = source.runWith(Sink.fold(0)(_ + _)) // division by zero will fail the stream and the // result here will be a Future completed with Failure(ArithmeticException)
- Java
-
source
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5)).map(elem -> 100 / elem); final Sink<Integer, CompletionStage<Integer>> fold = Sink.<Integer, Integer>fold(0, (acc, elem) -> acc + elem); final CompletionStage<Integer> result = source.runWith(fold, system); // division by zero will fail the stream and the // result here will be a CompletionStage failed with ArithmeticException
The default supervision strategy for a stream can be defined on the complete RunnableGraph
RunnableGraph
.
- Scala
-
source
val decider: Supervision.Decider = { case _: ArithmeticException => Supervision.Resume case _ => Supervision.Stop } val source = Source(0 to 5).map(100 / _) val runnableGraph = source.toMat(Sink.fold(0)(_ + _))(Keep.right) val withCustomSupervision = runnableGraph.withAttributes(ActorAttributes.supervisionStrategy(decider)) val result = withCustomSupervision.run() // the element causing division by zero will be dropped // result here will be a Future completed with Success(228)
- Java
-
source
final Function<Throwable, Supervision.Directive> decider = exc -> { if (exc instanceof ArithmeticException) return Supervision.resume(); else return Supervision.stop(); }; final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5)) .map(elem -> 100 / elem) .withAttributes(ActorAttributes.withSupervisionStrategy(decider)); final Sink<Integer, CompletionStage<Integer>> fold = Sink.fold(0, (acc, elem) -> acc + elem); final RunnableGraph<CompletionStage<Integer>> runnableGraph = source.toMat(fold, Keep.right()); final RunnableGraph<CompletionStage<Integer>> withCustomSupervision = runnableGraph.withAttributes(ActorAttributes.withSupervisionStrategy(decider)); final CompletionStage<Integer> result = withCustomSupervision.run(system); // the element causing division by zero will be dropped // result here will be a CompletionStage completed with 228
Here you can see that all ArithmeticException
will resume the processing, i.e. the elements that cause the division by zero are effectively dropped.
Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in Graph cycles, liveness and deadlocks.
The supervision strategy can also be defined for all operators of a flow.
- Scala
-
source
val decider: Supervision.Decider = { case _: ArithmeticException => Supervision.Resume case _ => Supervision.Stop } val flow = Flow[Int] .filter(100 / _ < 50) .map(elem => 100 / (5 - elem)) .withAttributes(ActorAttributes.supervisionStrategy(decider)) val source = Source(0 to 5).via(flow) val result = source.runWith(Sink.fold(0)(_ + _)) // the elements causing division by zero will be dropped // result here will be a Future completed with Success(150)
- Java
-
source
final Function<Throwable, Supervision.Directive> decider = exc -> { if (exc instanceof ArithmeticException) return Supervision.resume(); else return Supervision.stop(); }; final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class) .filter(elem -> 100 / elem < 50) .map(elem -> 100 / (5 - elem)) .withAttributes(ActorAttributes.withSupervisionStrategy(decider)); final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5)).via(flow); final Sink<Integer, CompletionStage<Integer>> fold = Sink.<Integer, Integer>fold(0, (acc, elem) -> acc + elem); final CompletionStage<Integer> result = source.runWith(fold, system); // the elements causing division by zero will be dropped // result here will be a Future completed with 150
Restart
Supervision.restart()
works in a similar way as Resume
Supervision.resume()
with the addition that accumulated state, if any, of the failing processing operator will be reset.
- Scala
-
source
val decider: Supervision.Decider = { case _: IllegalArgumentException => Supervision.Restart case _ => Supervision.Stop } val flow = Flow[Int] .scan(0) { (acc, elem) => if (elem < 0) throw new IllegalArgumentException("negative not allowed") else acc + elem } .withAttributes(ActorAttributes.supervisionStrategy(decider)) val source = Source(List(1, 3, -1, 5, 7)).via(flow) val result = source.limit(1000).runWith(Sink.seq) // the negative element cause the scan stage to be restarted, // i.e. start from 0 again // result here will be a Future completed with Success(Vector(0, 1, 4, 0, 5, 12))
- Java
-
source
final Function<Throwable, Supervision.Directive> decider = exc -> { if (exc instanceof IllegalArgumentException) return Supervision.restart(); else return Supervision.stop(); }; final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class) .scan( 0, (acc, elem) -> { if (elem < 0) throw new IllegalArgumentException("negative not allowed"); else return acc + elem; }) .withAttributes(ActorAttributes.withSupervisionStrategy(decider)); final Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 3, -1, 5, 7)).via(flow); final CompletionStage<List<Integer>> result = source.grouped(1000).runWith(Sink.<List<Integer>>head(), system); // the negative element cause the scan stage to be restarted, // i.e. start from 0 again // result here will be a Future completed with List(0, 1, 4, 0, 5, 12)
Errors from mapAsync
Stream supervision can also be applied to the futures of mapAsync
mapAsync
and mapAsyncUnordered
mapAsyncUnordered
even if such failures happen in the future rather than inside the operator itself.
Let’s say that we use an external service to lookup email addresses and we would like to discard those that cannot be found.
We start with the tweet stream of authors:
- Scala
-
source
val authors: Source[Author, NotUsed] = tweets.filter(_.hashtags.contains(pekkoTag)).map(_.author)
- Java
-
source
final Source<Author, NotUsed> authors = tweets.filter(t -> t.hashtags().contains(PEKKO)).map(t -> t.author);
Assume that we can lookup their email address using:
- Scala
-
source
def lookupEmail(handle: String): Future[String] =
- Java
-
source
public CompletionStage<String> lookupEmail(String handle)
The Future
CompletionStage
is completed with Failure
normally if the email is not found.
Transforming the stream of authors to a stream of email addresses by using the lookupEmail
service can be done with mapAsync
mapAsync
and we use Supervision.resumingDecider
Supervision.getResumingDecider()
to drop unknown email addresses:
- Scala
-
source
import ActorAttributes.supervisionStrategy import Supervision.resumingDecider val emailAddresses: Source[String, NotUsed] = authors.via( Flow[Author] .mapAsync(4)(author => addressSystem.lookupEmail(author.handle)) .withAttributes(supervisionStrategy(resumingDecider)))
- Java
-
source
final Attributes resumeAttrib = ActorAttributes.withSupervisionStrategy(Supervision.getResumingDecider()); final Flow<Author, String, NotUsed> lookupEmail = Flow.of(Author.class) .mapAsync(4, author -> addressSystem.lookupEmail(author.handle)) .withAttributes(resumeAttrib); final Source<String, NotUsed> emailAddresses = authors.via(lookupEmail);
If we would not use Resume
Supervision.resume()
the default stopping strategy would complete the stream with failure on the first Future
CompletionStage
that was completed with Failure
exceptionally.