onCompleteWithBreaker

Signature

def onCompleteWithBreaker[T](breaker: CircuitBreaker)(future: 

Description

Evaluates its parameter of type Future[T]CompletionStage<T> protecting it with the specified CircuitBreakerCircuitBreaker. Refer to Circuit Breaker for a detailed description of this pattern.

If the CircuitBreakerCircuitBreaker is open, the request is rejected with a CircuitBreakerOpenRejectionCircuitBreakerOpenRejection. Note that in this case the request’s entity databytes stream is cancelled, and the connection is closed as a consequence.

Otherwise, the same behaviour provided by onComplete is to be expected.

Example

Scala
sourcedef divide(a: Int, b: Int): Future[Int] = Future {
  a / b
}

val resetTimeout = 1.second
val breaker = new CircuitBreaker(
  system.scheduler,
  maxFailures = 1,
  callTimeout = 5.seconds,
  resetTimeout)

val route =
  path("divide" / IntNumber / IntNumber) { (a, b) =>
    onCompleteWithBreaker(breaker)(divide(a, b)) {
      case Success(value) => complete(s"The result was $value")
      case Failure(ex)    => complete(InternalServerError, s"An error occurred: ${ex.getMessage}")
    }
  }

// tests:
Get("/divide/10/2") ~> route ~> check {
  responseAs[String] shouldEqual "The result was 5"
}

Get("/divide/10/0") ~> Route.seal(route) ~> check {
  status shouldEqual InternalServerError
  responseAs[String] shouldEqual "An error occurred: / by zero"
} // opens the circuit breaker

Get("/divide/10/2") ~> route ~> check {
  rejection shouldBe a[CircuitBreakerOpenRejection]
}

Thread.sleep(resetTimeout.toMillis + 200)

Get("/divide/10/2") ~> route ~> check {
  responseAs[String] shouldEqual "The result was 5"
}
Java
sourceimport static org.apache.pekko.http.javadsl.server.Directives.onCompleteWithBreaker;
import static org.apache.pekko.http.javadsl.server.Directives.path;

// import static scala.compat.java8.JFunction.func;
// import static org.apache.pekko.http.javadsl.server.PathMatchers.*;

final int maxFailures = 1;
final FiniteDuration callTimeout = FiniteDuration.create(5, TimeUnit.SECONDS);
final FiniteDuration resetTimeout = FiniteDuration.create(1, TimeUnit.SECONDS);
final CircuitBreaker breaker =
    CircuitBreaker.create(system().scheduler(), maxFailures, callTimeout, resetTimeout);

final Route route =
    path(
        segment("divide").slash(integerSegment()).slash(integerSegment()),
        (a, b) ->
            onCompleteWithBreaker(
                breaker,
                () -> CompletableFuture.supplyAsync(() -> a / b),
                maybeResult ->
                    maybeResult
                        .map(result -> complete("The result was " + result))
                        .recover(
                            new PFBuilder<Throwable, Route>()
                                .matchAny(
                                    ex ->
                                        complete(
                                            StatusCodes.InternalServerError(),
                                            "An error occurred: " + ex.toString()))
                                .build())
                        .get()));

testRoute(route).run(HttpRequest.GET("/divide/10/2")).assertEntity("The result was 5");

testRoute(route)
    .run(HttpRequest.GET("/divide/10/0"))
    .assertStatusCode(StatusCodes.InternalServerError())
    .assertEntity("An error occurred: java.lang.ArithmeticException: / by zero");

// The circuit-breaker will eventually be opened
new TestKit(system()) {
  {
    awaitAssert(
        Duration.ofSeconds(500),
        () -> {
          testRoute(route)
              .run(HttpRequest.GET("/divide/10/0"))
              .assertEntity(
                  "The server is currently unavailable (because it is overloaded or down for maintenance).")
              .assertStatusCode(StatusCodes.ServiceUnavailable());
          return null;
        });

    Thread.sleep(resetTimeout.toMillis());

    // circuit breaker resets after this time, but observing it
    // is timing sensitive so retry a few times within a timeout
    awaitAssert(
        Duration.ofSeconds(500),
        () -> {
          testRoute(route).run(HttpRequest.GET("/divide/8/2")).assertEntity("The result was 4");
          return null;
        });
  }
};