Pekko HTTP interop
Pekko gRPC is built on top of Pekko HTTP. This means it is possible to leverage the Pekko HTTP API’s to create more complicated services, for example serving non-gRPC endpoints next to gRPC endpoints or adding additional behavior around your gRPC routes.
Example: authentication/authorization¶
One use case could be adding cross-cutting concerns such as authentication/authorization. Suppose you have an API that you want to secure using a token. You already have a regular HTTP API that users can use to obtain a token, and want to secure your gRPC routes to only accept calls that include this token.
Pekko HTTP authentication route¶
This route could be any arbitrary Pekko HTTP route. For this example we just provide a hint in the response body:
source// A Route to authenticate with
val authenticationRoute: Route = path("login") {
get {
complete("Psst, please use token XYZ!")
}
}
source// A Route to authenticate with
Route authentication = path("login", () -> get(() -> complete("Psst, please use token XYZ!")));
Pekko gRPC route¶
We create the Pekko gRPC service implementation, and convert it to a Route
as well:
source// Create service handlers
val handler: HttpRequest => Future[HttpResponse] =
GreeterServiceHandler(new GreeterServiceImpl())
// As a Route
val handlerRoute: Route = handle(handler)
source// Instantiate implementation
GreeterService impl = new GreeterServiceImpl(mat);
Function<HttpRequest, CompletionStage<HttpResponse>> handler =
GreeterServiceHandlerFactory.create(impl, sys);
// As a Route
Route handlerRoute = handle(handler);
Securing the Pekko gRPC route¶
We can wrap the gRPC route just like any Route
, applying the authorization:
source// A directive to authorize calls
val authorizationDirective: Directive0 =
headerValueByName("token").flatMap { token =>
if (token == "XYZ") pass
else reject
}
source// Protect the handler route
Route protectedHandler =
headerValueByName(
"token",
token -> {
if ("XYZ".equals(token)) {
return handlerRoute;
} else {
return complete(StatusCodes.UNAUTHORIZED);
}
});
Tying it all together¶
Finally we can combine the routes and serve them. Remember we need to use bindAndHandleAsync
to enable HTTP/2 support:
sourceval route = concat(
authenticationRoute,
authorizationDirective {
handlerRoute
})
// Bind service handler servers to localhost:8082
val binding = Http().newServerAt("127.0.0.1", 8082).bind(route)
sourceRoute finalRoute = concat(authentication, protectedHandler);
return Http.get(sys).newServerAt("127.0.0.1", 8090).bind(finalRoute);
Example: logging, error handling, and passing request context¶
This example shows how we can wrap a gRPC service implementation into a Route
to get the following common server features:
- Log the HTTP request and response corresponding to each RPC.
- Pass the
RequestContext
into the RPC handler. - If the RPC fails, apply a custom error handler and log the error.
Implementation¶
We start with an implementation of the sayHello RPC that does some primitive validation on the name.
If the name starts with a lowercase later, we return an IllegalArgumentException
. Otherwise, we return a standard response.
However, there’s also a slight bug in the implementation. If the name is empty, the call to get the first character in the first if statement will throw an Exception.
sourceprivate final class Impl(mat: Materializer) extends GreeterServiceImpl()(mat) {
override def sayHello(in: HelloRequest): Future[HelloReply] =
if (in.name.head.isLower) {
Future.failed(new IllegalArgumentException("Name must be capitalized"))
} else {
Future.successful(HelloReply(s"Hello, ${in.name}"))
}
}
sourceprivate static class Impl extends GreeterServiceImpl {
public Impl(Materializer mat) {
super(mat);
}
@Override
public CompletionStage<HelloReply> sayHello(HelloRequest in) {
if (Character.isLowerCase(in.getName().charAt(0))) {
CompletableFuture<HelloReply> reply = new CompletableFuture<>();
reply.completeExceptionally(new IllegalArgumentException("Name must be capitalized"));
return reply;
} else {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello, " + in.getName()).build();
return CompletableFuture.completedFuture(reply);
}
}
}
Method to log, handle, and recover each RPC¶
We define a method that returns a Route
implementing our logging and error recovery features. The method takes three parameters.
- A function that takes a
RequestContext
and returns a service implementation. This gives us the opportunity to use the context in the implementation. If we don’t need it, we can just ignore the context and return a fixed implementation. - A function that takes an
ActorSystem
and returns a partial function from Throwable to gRPCTrailers
. - A function that takes the service implementation and an error handler and returns a request handler (a function from
HttpRequest
to aCompletionStage
ofHttpResponse
).
The method first uses an existing directive to log requests and results. Then it wraps the given error handler into an error handler that also logs the error. Finally, it calls the given functions to handle incoming requests.
sourceprivate type ErrorHandler = ActorSystem => PartialFunction[Throwable, Trailers]
private def loggingErrorHandlingGrpcRoute[ServiceImpl](
buildImpl: RequestContext => ServiceImpl,
errorHandler: ErrorHandler,
buildHandler: (ServiceImpl, ErrorHandler) => HttpRequest => Future[HttpResponse]): Route =
DebuggingDirectives.logRequestResult(("loggingErrorHandlingGrpcRoute", Logging.InfoLevel)) {
extractRequestContext { ctx =>
val loggingErrorHandler: ErrorHandler = (sys: ActorSystem) => {
case NonFatal(t) =>
val pf = errorHandler(sys)
if (pf.isDefinedAt(t)) {
val trailers: Trailers = pf(t)
ctx.log.error(t, s"Grpc failure handled and mapped to $trailers")
trailers
} else {
val trailers = Trailers(Status.INTERNAL)
ctx.log.error(t, s"Grpc failure UNHANDLED and mapped to $trailers")
trailers
}
}
val impl = buildImpl(ctx)
val handler = buildHandler(impl, loggingErrorHandler)
handle(handler)
}
}
sourceprivate static <ServiceImpl> Route loggingErrorHandlingGrpcRoute(
Function<RequestContext, ServiceImpl> buildImpl,
Function<ActorSystem, Function<Throwable, Trailers>> errorHandler,
BiFunction<
ServiceImpl,
Function<ActorSystem, Function<Throwable, Trailers>>,
org.apache.pekko.japi.function.Function<HttpRequest, CompletionStage<HttpResponse>>>
buildHandler) {
return logRequest(
"loggingErrorHandlingGrpcRoute",
Logging.InfoLevel(),
() ->
extractRequestContext(
ctx -> {
Function<ActorSystem, Function<Throwable, Trailers>> loggingErrorHandler =
(actorSystem) ->
(throwable) -> {
Function<Throwable, Trailers> function =
errorHandler.apply(actorSystem);
Trailers trailers = function.apply(throwable);
if (trailers != null) {
ctx.getLog()
.error(
throwable, "Grpc failure handled and mapped to " + trailers);
return trailers;
} else {
Trailers internal = new Trailers(Status.INTERNAL);
ctx.getLog()
.error(
throwable,
"Grpc failure UNHANDLED and mapped to " + internal);
return internal;
}
};
try {
ServiceImpl impl = buildImpl.apply(ctx);
org.apache.pekko.japi.function.Function<
HttpRequest, CompletionStage<HttpResponse>>
handler = buildHandler.apply(impl, loggingErrorHandler);
return handle(handler);
} catch (Exception e) {
return failWith(e);
}
}));
}
Custom error mapping¶
We define a partial function to handle the custom exception we defined above.
sourceprivate val customErrorMapping: PartialFunction[Throwable, Trailers] = {
case ex: IllegalArgumentException => Trailers(Status.INVALID_ARGUMENT.withDescription(ex.getMessage))
}
sourceprivate static final Function<Throwable, Trailers> customErrorMapping =
(throwable) -> {
if (throwable instanceof IllegalArgumentException) {
return new Trailers(Status.INVALID_ARGUMENT);
} else {
return null;
}
};
Tying it all together¶
Finally, we invoke the new method and bind the resulting Route
.
sourceval route = loggingErrorHandlingGrpcRoute[GreeterService](
buildImpl = rc => new Impl(rc.materializer),
buildHandler = (impl, eHandler) =>
ServiceHandler.concatOrNotFound(
GreeterServiceHandler.partial(impl, eHandler = eHandler),
ServerReflection.partial(List(GreeterService))),
errorHandler = _ => customErrorMapping)
// Bind service handler servers to localhost:8082
val binding = Http().newServerAt("127.0.0.1", 8082).bind(route)
sourceRoute route =
loggingErrorHandlingGrpcRoute(
(rc) -> new Impl(rc.getMaterializer()),
(actorSystem) -> customErrorMapping,
(impl, eHandler) ->
GreeterServiceHandlerFactory.partial(
impl, GreeterService.name, mat, eHandler, sys));
return Http.get(sys).newServerAt("127.0.0.1", 8082).bind(route);
Results¶
We make three calls: one with a valid name, one with a lowercase name, and one with an empty name.
For the valid name, the RPC succeeds and the server prints only the response log:
[INFO] [05/15/2022 09:24:36.850] [Server-pekko.actor.default-dispatcher-8] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Strict(application/grpc+proto,42 bytes total),HttpProtocol(HTTP/1.1)))
For the lowercase name, the server prints the error log and the response log. Note that the server still returns a status code 200, even though the RPC failed. This is because gRPC encodes a failure as a successful HTTP response containing the error in the body.
[ERROR] [05/15/2022 09:24:36.902] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] Grpc failure handled and mapped to org.apache.pekko.grpc.Trailers@4ab49ff7
java.lang.IllegalArgumentException: Name must be capitalized
at example.myapp.helloworld.LoggingErrorHandlingGreeterServer$Impl$1.sayHello(LoggingErrorHandlingGreeterServer.scala:43)
at example.myapp.helloworld.grpc.GreeterServiceHandler$.$anonfun$partial$2(GreeterServiceHandler.scala:118)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[INFO] [05/15/2022 09:24:36.905] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Chunked(application/grpc+proto),HttpProtocol(HTTP/1.1)))
For the empty name, the server prints a slightly different error log and the response log,
[ERROR] [05/15/2022 09:24:36.914] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] Grpc failure UNHANDLED and mapped to org.apache.pekko.grpc.Trailers@5e1d9001
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.IterableLike.head(IterableLike.scala:109)
at scala.collection.IterableLike.head$(IterableLike.scala:108)
at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:33)
at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
at scala.collection.immutable.StringOps.head(StringOps.scala:33)
at example.myapp.helloworld.LoggingErrorHandlingGreeterServer$Impl$1.sayHello(LoggingErrorHandlingGreeterServer.scala:42)
at example.myapp.helloworld.grpc.GreeterServiceHandler$.$anonfun$partial$2(GreeterServiceHandler.scala:118)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[INFO] [05/15/2022 09:24:36.914] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Chunked(application/grpc+proto),HttpProtocol(HTTP/1.1)))
Future work¶
For in-depth pekko-grpc/pekko-http integration, we currently need to pass information from the Pekko HTTP route into the service implementation constructor, and construct a new Handler for each request. This pattern is shown in an example above.
In the future we plan to provide a nicer API for this, for example we could pass the Pekko HTTP attributes (introduced in Akka HTTP 10.2.0) as Metadata when using the PowerApi.