RetryFlow.withBackoffAndContext
Wrap the given FlowWithContext
FlowWithContext
and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.
Signature
RetryFlow.withBackoffAndContext
RetryFlow.withBackoffAndContext
Description
When an element is emitted by the wrapped flow
it is passed to the decideRetry
function, which may return an element to retry in the flow
.
The retry backoff is controlled by the minBackoff
, maxBackoff
and randomFactor
parameters. At most maxRetries
will be made after the initial try.
The wrapped flow
must have one-in one-out semantics. It may not filter, nor duplicate elements. The RetryFlow
will fail if two elements are emitted from the flow
, it will be stuck “forever” if nothing is emitted. Just one element will be emitted into the flow
at any time. The flow
needs to emit an element before the next will be emitted to it.
Elements are retried as long as maxRetries
is not reached and the decideRetry
function returns a new element to be sent to flow
. The decideRetry
function gets passed in the original element sent to the flow
and the element emitted by it together with their contexts as tuplesorg.apache.pekko.japi.Pair
s. When decideRetry
returns None
Optional.empty
, no retries will be issued, and the response will be emitted downstream.
This API may be changed in further patch releases.
This example wraps a flow
handling Int
sInteger
s with SomeContext
in context, and retries elements unless the result is 0 or negative, or maxRetries
is hit.
- Scala
-
source
val flow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = // ??? val retryFlow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = RetryFlow.withBackoffAndContext( minBackoff = 10.millis, maxBackoff = 5.seconds, randomFactor = 0d, maxRetries = 3, flow)(decideRetry = { case ((_, _), (result, ctx)) if result > 0 => Some(result -> ctx) case _ => None })
- Java
-
source
FlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> flow = // ... // the wrapped flow FlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> retryFlow = RetryFlow.withBackoffAndContext( minBackoff, maxBackoff, randomFactor, maxRetries, flow, (in, out) -> { Integer value = out.first(); SomeContext context = out.second(); if (value > 0) { return Optional.of(Pair.create(value, context)); } else { return Optional.empty(); } });
Reactive Streams semantics
emits when the wrapped flow emits, and either maxRetries
is reached or decideRetry
returns None
Optional.empty
backpressures during backoff, when the wrapped flow backpressures, or when downstream backpressures
completes when upstream or the wrapped flow completes
cancels when downstream or the wrapped flow cancels