RetryFlow.withBackoffAndContext

Wrap the given FlowWithContextFlowWithContext and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.

Error handling

Signature

RetryFlow.withBackoffAndContextRetryFlow.withBackoffAndContext

Description

When an element is emitted by the wrapped flow it is passed to the decideRetry function, which may return an element to retry in the flow.

The retry backoff is controlled by the minBackoff, maxBackoff and randomFactor parameters. At most maxRetries will be made after the initial try.

The wrapped flow must have one-in one-out semantics. It may not filter, nor duplicate elements. The RetryFlow will fail if two elements are emitted from the flow, it will be stuck “forever” if nothing is emitted. Just one element will be emitted into the flow at any time. The flow needs to emit an element before the next will be emitted to it.

Elements are retried as long as maxRetries is not reached and the decideRetry function returns a new element to be sent to flow. The decideRetry function gets passed in the original element sent to the flow and the element emitted by it together with their contexts as tuplesorg.apache.pekko.japi.Pairs. When decideRetry returns NoneOptional.empty, no retries will be issued, and the response will be emitted downstream.

Note

This API may be changed in further patch releases.

This example wraps a flow handling IntsIntegers with SomeContext in context, and retries elements unless the result is 0 or negative, or maxRetries is hit.

Scala
sourceval flow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] = // ???

val retryFlow: FlowWithContext[Int, SomeContext, Int, SomeContext, NotUsed] =
  RetryFlow.withBackoffAndContext(
    minBackoff = 10.millis,
    maxBackoff = 5.seconds,
    randomFactor = 0d,
    maxRetries = 3,
    flow)(decideRetry = {
    case ((_, _), (result, ctx)) if result > 0 => Some(result -> ctx)
    case _                                     => None
  })
Java
sourceFlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> flow = // ...
    // the wrapped flow

FlowWithContext<Integer, SomeContext, Integer, SomeContext, NotUsed> retryFlow =
    RetryFlow.withBackoffAndContext(
        minBackoff,
        maxBackoff,
        randomFactor,
        maxRetries,
        flow,
        (in, out) -> {
          Integer value = out.first();
          SomeContext context = out.second();
          if (value > 0) {
            return Optional.of(Pair.create(value, context));
          } else {
            return Optional.empty();
          }
        });

Reactive Streams semantics

emits when the wrapped flow emits, and either maxRetries is reached or decideRetry returns NoneOptional.empty

backpressures during backoff, when the wrapped flow backpressures, or when downstream backpressures

completes when upstream or the wrapped flow completes

cancels when downstream or the wrapped flow cancels