onCompleteWithBreaker
Signature¶
def onCompleteWithBreaker[T](breaker: CircuitBreaker)(future:
Description¶
Evaluates its parameter of type Future[T]
protecting it with the specified CircuitBreaker
. Refer to Circuit Breaker for a detailed description of this pattern.
If the CircuitBreaker
is open, the request is rejected with a CircuitBreakerOpenRejection
. Note that in this case the request’s entity databytes stream is cancelled, and the connection is closed as a consequence.
Otherwise, the same behaviour provided by onComplete is to be expected.
Example¶
sourcedef divide(a: Int, b: Int): Future[Int] = Future {
a / b
}
val resetTimeout = 1.second
val breaker = new CircuitBreaker(
system.scheduler,
maxFailures = 1,
callTimeout = 5.seconds,
resetTimeout)
val route =
path("divide" / IntNumber / IntNumber) { (a, b) =>
onCompleteWithBreaker(breaker)(divide(a, b)) {
case Success(value) => complete(s"The result was $value")
case Failure(ex) => complete(InternalServerError, s"An error occurred: ${ex.getMessage}")
}
}
// tests:
Get("/divide/10/2") ~> route ~> check {
responseAs[String] shouldEqual "The result was 5"
}
Get("/divide/10/0") ~> Route.seal(route) ~> check {
status shouldEqual InternalServerError
responseAs[String] shouldEqual "An error occurred: / by zero"
} // opens the circuit breaker
Get("/divide/10/2") ~> route ~> check {
rejection shouldBe a[CircuitBreakerOpenRejection]
}
Thread.sleep(resetTimeout.toMillis + 200)
Get("/divide/10/2") ~> route ~> check {
responseAs[String] shouldEqual "The result was 5"
}
sourceimport static org.apache.pekko.http.javadsl.server.Directives.onCompleteWithBreaker;
import static org.apache.pekko.http.javadsl.server.Directives.path;
// import static scala.compat.java8.JFunction.func;
// import static org.apache.pekko.http.javadsl.server.PathMatchers.*;
final int maxFailures = 1;
final FiniteDuration callTimeout = FiniteDuration.create(5, TimeUnit.SECONDS);
final FiniteDuration resetTimeout = FiniteDuration.create(1, TimeUnit.SECONDS);
final CircuitBreaker breaker =
CircuitBreaker.create(system().scheduler(), maxFailures, callTimeout, resetTimeout);
final Route route =
path(
segment("divide").slash(integerSegment()).slash(integerSegment()),
(a, b) ->
onCompleteWithBreaker(
breaker,
() -> CompletableFuture.supplyAsync(() -> a / b),
maybeResult ->
maybeResult
.map(result -> complete("The result was " + result))
.recover(
new PFBuilder<Throwable, Route>()
.matchAny(
ex ->
complete(
StatusCodes.InternalServerError(),
"An error occurred: " + ex.toString()))
.build())
.get()));
testRoute(route).run(HttpRequest.GET("/divide/10/2")).assertEntity("The result was 5");
testRoute(route)
.run(HttpRequest.GET("/divide/10/0"))
.assertStatusCode(StatusCodes.InternalServerError())
.assertEntity("An error occurred: java.lang.ArithmeticException: / by zero");
// The circuit-breaker will eventually be opened
new TestKit(system()) {
{
awaitAssert(
Duration.ofSeconds(500),
() -> {
testRoute(route)
.run(HttpRequest.GET("/divide/10/0"))
.assertEntity(
"The server is currently unavailable (because it is overloaded or down for maintenance).")
.assertStatusCode(StatusCodes.ServiceUnavailable());
return null;
});
Thread.sleep(resetTimeout.toMillis());
// circuit breaker resets after this time, but observing it
// is timing sensitive so retry a few times within a timeout
awaitAssert(
Duration.ofSeconds(500),
() -> {
testRoute(route).run(HttpRequest.GET("/divide/8/2")).assertEntity("The result was 4");
return null;
});
}
};
1.0.1