RestartSource.onFailuresWithBackoff
Wrap the given Source
with a Source
that will restart it when it fails using an exponential backoff. Notice that this Source
will not restart on completion of the wrapped flow.
Signature¶
RestartSource.onFailuresWithBackoff
Description¶
Wraps the given Source
with a Source
that will restart it when it fails using exponential backoff. The backoff resets back to minBackoff
if there hasn’t been a restart within maxRestartsWithin
(which defaults to minBackoff
).
This Source
will not emit a failure as long as maxRestarts is not reached. The failure of the wrapped Source
is handled by restarting it. However, the wrapped Source
can be cancelled by cancelling this Source
. When that happens, the wrapped Source
, if currently running will, be cancelled and not restarted. This can be triggered by the downstream cancelling, or externally by introducing a KillSwitch right after this Source
in the graph.
This uses the same exponential backoff algorithm as BackoffOpts
.
See also:
- RestartSource.withBackoff
- RestartFlow.onFailuresWithBackoff
- RestartFlow.withBackoff
- RestartSink.withBackoff
Examples¶
This shows that a Source is not restarted if it completes, only if it fails. Tick is only printed three times as the take(3)
means the inner source completes successfully after emitting the first 3 elements.
sourceval finiteSource = Source.tick(1.second, 1.second, "tick").take(3)
val forever = RestartSource.onFailuresWithBackoff(RestartSettings(1.second, 10.seconds, 0.1))(() => finiteSource)
forever.runWith(Sink.foreach(println))
// prints
// tick
// tick
// tick
sourceSource<String, Cancellable> finiteSource =
Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick").take(3);
Source<String, NotUsed> forever =
RestartSource.onFailuresWithBackoff(
RestartSettings.create(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1),
() -> finiteSource);
forever.runWith(Sink.foreach(System.out::println), system);
// prints
// tick
// tick
// tick
If the inner source instead fails, it will be restarted with an increasing backoff. The source emits 1, 2, 3, and then throws an exception. The first time the exception is thrown the source is restarted after 1s, then 2s etc, until the maxBackoff
of 10s.
source// could throw if for example it used a database connection to get rows
val flakySource: Source[() => Int, NotUsed] =
Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val forever =
RestartSource.onFailuresWithBackoff(
RestartSettings(minBackoff = 1.second, maxBackoff = 10.seconds, randomFactor = 0.1))(() => flakySource)
forever.runWith(Sink.foreach(nr => system.log.info("{}", nr())))
// logs
// [INFO] [12/10/2019 13:51:58.300] [default-pekko.test.stream-dispatcher-7] [pekko.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:51:58.301] [default-pekko.test.stream-dispatcher-7] [pekko.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:51:58.302] [default-pekko.test.stream-dispatcher-7] [pekko.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:51:58.310] [default-pekko.test.stream-dispatcher-7] [RestartWithBackoffSource(pekko://default)] Restarting graph due to failure. stack_trace: (docs.stream.operators.source.Restart$CantConnectToDatabase: darn)
// --> 1 second gap
// [INFO] [12/10/2019 13:51:59.379] [default-pekko.test.stream-dispatcher-8] [pekko.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:51:59.382] [default-pekko.test.stream-dispatcher-8] [pekko.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:51:59.383] [default-pekko.test.stream-dispatcher-8] [pekko.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:51:59.386] [default-pekko.test.stream-dispatcher-8] [RestartWithBackoffSource(pekko://default)] Restarting graph due to failure. stack_trace: (docs.stream.operators.source.Restart$CantConnectToDatabase: darn)
// --> 2 second gap
// [INFO] [12/10/2019 13:52:01.594] [default-pekko.test.stream-dispatcher-8] [pekko.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:52:01.595] [default-pekko.test.stream-dispatcher-8] [pekko.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:52:01.595] [default-pekko.test.stream-dispatcher-8] [pekko.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:52:01.596] [default-pekko.test.stream-dispatcher-8] [RestartWithBackoffSource(pekko://default)] Restarting graph due to failure. stack_trace: (docs.stream.operators.source.Restart$CantConnectToDatabase: darn)
source// could throw if for example it used a database connection to get rows
Source<Creator<Integer>, NotUsed> flakySource =
Source.from(
Arrays.<Creator<Integer>>asList(
() -> 1,
() -> 2,
() -> 3,
() -> {
throw new RuntimeException("darn");
}));
Source<Creator<Integer>, NotUsed> forever =
RestartSource.onFailuresWithBackoff(
RestartSettings.create(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1),
() -> flakySource);
forever.runWith(
Sink.foreach((Creator<Integer> nr) -> system.log().info("{}", nr.create())), system);
// logs
// [INFO] [12/10/2019 13:51:58.300] [default-pekko.test.stream-dispatcher-7]
// [pekko.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:51:58.301] [default-pekko.test.stream-dispatcher-7]
// [pekko.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:51:58.302] [default-pekko.test.stream-dispatcher-7]
// [pekko.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:51:58.310] [default-pekko.test.stream-dispatcher-7]
// [RestartWithBackoffSource(pekko://default)] Restarting graph due to failure. stack_trace:
// (RuntimeException: darn)
// --> 1 second gap
// [INFO] [12/10/2019 13:51:59.379] [default-pekko.test.stream-dispatcher-8]
// [pekko.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:51:59.382] [default-pekko.test.stream-dispatcher-8]
// [pekko.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:51:59.383] [default-pekko.test.stream-dispatcher-8]
// [pekko.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:51:59.386] [default-pekko.test.stream-dispatcher-8]
// [RestartWithBackoffSource(pekko://default)] Restarting graph due to failure. stack_trace:
// (RuntimeException: darn)
// --> 2 second gap
// [INFO] [12/10/2019 13:52:01.594] [default-pekko.test.stream-dispatcher-8]
// [pekko.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:52:01.595] [default-pekko.test.stream-dispatcher-8]
// [pekko.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:52:01.595] [default-pekko.test.stream-dispatcher-8]
// [pekko.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:52:01.596] [default-pekko.test.stream-dispatcher-8]
// [RestartWithBackoffSource(pekko://default)] Restarting graph due to failure. stack_trace:
// (RuntimeException: darn)
Finally, to be able to stop the restarting, a kill switch can be used. The kill switch is inserted right after the restart source. The inner source is the same as above so emits 3 elements and then fails. A killswitch is used to be able to stop the source being restarted:
sourceval flakySource: Source[() => Int, NotUsed] =
Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val stopRestarting: UniqueKillSwitch =
RestartSource
.onFailuresWithBackoff(RestartSettings(1.second, 10.seconds, 0.1))(() => flakySource)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(nr => println(s"Nr ${nr()}")))(Keep.left)
.run()
// ... from some where else
// stop the source from restarting
stopRestarting.shutdown()
sourceSource<Creator<Integer>, NotUsed> flakySource =
Source.from(
Arrays.<Creator<Integer>>asList(
() -> 1,
() -> 2,
() -> 3,
() -> {
throw new RuntimeException("darn");
}));
UniqueKillSwitch stopRestarting =
RestartSource.onFailuresWithBackoff(
RestartSettings.create(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1),
() -> flakySource)
.viaMat(KillSwitches.single(), Keep.right())
.toMat(Sink.foreach(nr -> System.out.println("nr " + nr.create())), Keep.left())
.run(system);
// ... from some where else
// stop the source from restarting
stopRestarting.shutdown();
Reactive Streams semantics¶
emits when the wrapped source emits
backpressures during backoff and when downstream backpressures
completes when the wrapped source completes or maxRestarts
are reached within the given time limit
cancels when downstream cancels