RetryFlow.withBackoffAndContext
Wrap the given FlowWithContextFlowWithContext and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.
Signature
RetryFlow.withBackoffAndContextRetryFlow.withBackoffAndContext
Description
When an element is emitted by the wrapped flow it is passed to the decideRetry function, which may return an element to retry in the flow.
The retry backoff is controlled by the minBackoff, maxBackoff and randomFactor parameters. At most maxRetries will be made after the initial try.
The wrapped flow must have one-in one-out semantics. It may not filter, nor duplicate elements. The RetryFlow will fail if two elements are emitted from the flow, it will be stuck “forever” if nothing is emitted. Just one element will be emitted into the flow at any time. The flow needs to emit an element before the next will be emitted to it.
Elements are retried as long as maxRetries is not reached and the decideRetry function returns a new element to be sent to flow. The decideRetry function gets passed in the original element sent to the flow and the element emitted by it together with their contexts as tuplesorg.apache.pekko.japi.Pairs. When decideRetry returns NoneOptional.empty, no retries will be issued, and the response will be emitted downstream.
This API may be changed in further patch releases.
This example wraps a flow handling IntsIntegers with SomeContext in context, and retries elements unless the result is 0 or negative, or maxRetries is hit.
- Scala
-
source
val flow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = // ??? val retryFlow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = RetryFlow.withBackoffAndContext( minBackoff = 10.millis, maxBackoff = 5.seconds, randomFactor = 0d, maxRetries = 3, flow)(decideRetry = { case ((_, _), (result, ctx)) if result > 0 => Some(result -> ctx) case _ => None }) - Java
-
source
FlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> flow = // ... // the wrapped flow FlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> retryFlow = RetryFlow.withBackoffAndContext( minBackoff, maxBackoff, randomFactor, maxRetries, flow, (in, out) -> { Integer value = out.first(); SomeContext context = out.second(); if (value > 0) { return Optional.of(Pair.create(value, context)); } else { return Optional.empty(); } });
Reactive Streams semantics
emits when the wrapped flow emits, and either maxRetries is reached or decideRetry returns NoneOptional.empty
backpressures during backoff, when the wrapped flow backpressures, or when downstream backpressures
completes when upstream or the wrapped flow completes
cancels when downstream or the wrapped flow cancels