Pekko HTTP interop
Pekko gRPC is built on top of Pekko HTTP. This means it is possible to leverage the Pekko HTTP API’s to create more complicated services, for example serving non-gRPC endpoints next to gRPC endpoints or adding additional behavior around your gRPC routes.
Example: authentication/authorization
One use case could be adding cross-cutting concerns such as authentication/authorization. Suppose you have an API that you want to secure using a token. You already have a regular HTTP API that users can use to obtain a token, and want to secure your gRPC routes to only accept calls that include this token.
Pekko HTTP authentication route
This route could be any arbitrary Pekko HTTP route. For this example we just provide a hint in the response body:
- Scala
-
source
// A Route to authenticate with val authenticationRoute: Route = path("login") { get { complete("Psst, please use token XYZ!") } }
- Java
-
source
// A Route to authenticate with Route authentication = path("login", () -> get(() -> complete("Psst, please use token XYZ!")));
Pekko gRPC route
We create the Pekko gRPC service implementation, and convert it to a Route
Route
as well:
- Scala
-
source
// Create service handlers val handler: HttpRequest => Future[HttpResponse] = GreeterServiceHandler(new GreeterServiceImpl()) // As a Route val handlerRoute: Route = handle(handler)
- Java
-
source
// Instantiate implementation GreeterService impl = new GreeterServiceImpl(mat); Function<HttpRequest, CompletionStage<HttpResponse>> handler = GreeterServiceHandlerFactory.create(impl, sys); // As a Route Route handlerRoute = handle(handler);
Securing the Pekko gRPC route
We can wrap the gRPC route just like any Route
Route
, applying the authorization:
- Scala
-
source
// A directive to authorize calls val authorizationDirective: Directive0 = headerValueByName("token").flatMap { token => if (token == "XYZ") pass else reject }
- Java
-
source
// Protect the handler route Route protectedHandler = headerValueByName( "token", token -> { if ("XYZ".equals(token)) { return handlerRoute; } else { return complete(StatusCodes.UNAUTHORIZED); } });
Tying it all together
Finally we can combine the routes and serve them. Remember we need to use bindAndHandleAsync
to enable HTTP/2 support:
- Scala
-
source
val route = concat( authenticationRoute, authorizationDirective { handlerRoute }) // Bind service handler servers to localhost:8082 val binding = Http().newServerAt("127.0.0.1", 8082).bind(route)
- Java
-
source
Route finalRoute = concat(authentication, protectedHandler); return Http.get(sys).newServerAt("127.0.0.1", 8090).bind(finalRoute);
Example: logging, error handling, and passing request context
This example shows how we can wrap a gRPC service implementation into a Route
Route
to get the following common server features:
- Log the HTTP request and response corresponding to each RPC.
- Pass the
RequestContext
RequestContext
into the RPC handler. - If the RPC fails, apply a custom error handler and log the error.
Implementation
We start with an implementation of the sayHello RPC that does some primitive validation on the name.
If the name starts with a lowercase later, we return an IllegalArgumentException
. Otherwise, we return a standard response.
However, there’s also a slight bug in the implementation. If the name is empty, the call to get the first character in the first if statement will throw an Exception.
- Scala
-
source
private final class Impl(mat: Materializer) extends GreeterServiceImpl()(mat) { override def sayHello(in: HelloRequest): Future[HelloReply] = if (in.name.head.isLower) { Future.failed(new IllegalArgumentException("Name must be capitalized")) } else { Future.successful(HelloReply(s"Hello, ${in.name}")) } }
- Java
-
source
private static class Impl extends GreeterServiceImpl { public Impl(Materializer mat) { super(mat); } @Override public CompletionStage<HelloReply> sayHello(HelloRequest in) { if (Character.isLowerCase(in.getName().charAt(0))) { CompletableFuture<HelloReply> reply = new CompletableFuture<>(); reply.completeExceptionally(new IllegalArgumentException("Name must be capitalized")); return reply; } else { HelloReply reply = HelloReply.newBuilder().setMessage("Hello, " + in.getName()).build(); return CompletableFuture.completedFuture(reply); } } }
Method to log, handle, and recover each RPC
We define a method that returns a Route
Route
implementing our logging and error recovery features. The method takes three parameters.
- A function that takes a
RequestContext
RequestContext
and returns a service implementation. This gives us the opportunity to use the context in the implementation. If we don’t need it, we can just ignore the context and return a fixed implementation. - A function that takes an
ActorSystem
ActorSystem
and returns a partial function from Throwable to gRPCTrailers
Trailers
. - A function that takes the service implementation and an error handler and returns a request handler (a function from
HttpRequest
HttpRequest
to aFuture
CompletionStage
ofHttpResponse
HttpResponse
).
The method first uses an existing directive to log requests and results. Then it wraps the given error handler into an error handler that also logs the error. Finally, it calls the given functions to handle incoming requests.
- Scala
-
source
private type ErrorHandler = ActorSystem => PartialFunction[Throwable, Trailers] private def loggingErrorHandlingGrpcRoute[ServiceImpl]( buildImpl: RequestContext => ServiceImpl, errorHandler: ErrorHandler, buildHandler: (ServiceImpl, ErrorHandler) => HttpRequest => Future[HttpResponse]): Route = DebuggingDirectives.logRequestResult(("loggingErrorHandlingGrpcRoute", Logging.InfoLevel)) { extractRequestContext { ctx => val loggingErrorHandler: ErrorHandler = (sys: ActorSystem) => { case NonFatal(t) => val pf = errorHandler(sys) if (pf.isDefinedAt(t)) { val trailers: Trailers = pf(t) ctx.log.error(t, s"Grpc failure handled and mapped to $trailers") trailers } else { val trailers = Trailers(Status.INTERNAL) ctx.log.error(t, s"Grpc failure UNHANDLED and mapped to $trailers") trailers } } val impl = buildImpl(ctx) val handler = buildHandler(impl, loggingErrorHandler) handle(handler) } }
- Java
-
source
private static <ServiceImpl> Route loggingErrorHandlingGrpcRoute( Function<RequestContext, ServiceImpl> buildImpl, Function<ActorSystem, Function<Throwable, Trailers>> errorHandler, BiFunction< ServiceImpl, Function<ActorSystem, Function<Throwable, Trailers>>, org.apache.pekko.japi.function.Function<HttpRequest, CompletionStage<HttpResponse>>> buildHandler) { return logRequest( "loggingErrorHandlingGrpcRoute", Logging.InfoLevel(), () -> extractRequestContext( ctx -> { Function<ActorSystem, Function<Throwable, Trailers>> loggingErrorHandler = (actorSystem) -> (throwable) -> { Function<Throwable, Trailers> function = errorHandler.apply(actorSystem); Trailers trailers = function.apply(throwable); if (trailers != null) { ctx.getLog() .error( throwable, "Grpc failure handled and mapped to " + trailers); return trailers; } else { Trailers internal = new Trailers(Status.INTERNAL); ctx.getLog() .error( throwable, "Grpc failure UNHANDLED and mapped to " + internal); return internal; } }; try { ServiceImpl impl = buildImpl.apply(ctx); org.apache.pekko.japi.function.Function< HttpRequest, CompletionStage<HttpResponse>> handler = buildHandler.apply(impl, loggingErrorHandler); return handle(handler); } catch (Exception e) { return failWith(e); } })); }
Custom error mapping
We define a partial function to handle the custom exception we defined above.
- Scala
-
source
private val customErrorMapping: PartialFunction[Throwable, Trailers] = { case ex: IllegalArgumentException => Trailers(Status.INVALID_ARGUMENT.withDescription(ex.getMessage)) }
- Java
-
source
private static final Function<Throwable, Trailers> customErrorMapping = (throwable) -> { if (throwable instanceof IllegalArgumentException) { return new Trailers(Status.INVALID_ARGUMENT); } else { return null; } };
Tying it all together
Finally, we invoke the new method and bind the resulting Route
Route
.
- Scala
-
source
val route = loggingErrorHandlingGrpcRoute[GreeterService]( buildImpl = rc => new Impl(rc.materializer), buildHandler = (impl, eHandler) => ServiceHandler.concatOrNotFound( GreeterServiceHandler.partial(impl, eHandler = eHandler), ServerReflection.partial(List(GreeterService))), errorHandler = _ => customErrorMapping) // Bind service handler servers to localhost:8082 val binding = Http().newServerAt("127.0.0.1", 8082).bind(route)
- Java
-
source
Route route = loggingErrorHandlingGrpcRoute( (rc) -> new Impl(rc.getMaterializer()), (actorSystem) -> customErrorMapping, (impl, eHandler) -> GreeterServiceHandlerFactory.partial( impl, GreeterService.name, mat, eHandler, sys)); return Http.get(sys).newServerAt("127.0.0.1", 8082).bind(route);
Results
We make three calls: one with a valid name, one with a lowercase name, and one with an empty name.
For the valid name, the RPC succeeds and the server prints only the response log:
[INFO] [05/15/2022 09:24:36.850] [Server-pekko.actor.default-dispatcher-8] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Strict(application/grpc+proto,42 bytes total),HttpProtocol(HTTP/1.1)))
For the lowercase name, the server prints the error log and the response log. Note that the server still returns a status code 200, even though the RPC failed. This is because gRPC encodes a failure as a successful HTTP response containing the error in the body.
[ERROR] [05/15/2022 09:24:36.902] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] Grpc failure handled and mapped to org.apache.pekko.grpc.Trailers@4ab49ff7
java.lang.IllegalArgumentException: Name must be capitalized
at example.myapp.helloworld.LoggingErrorHandlingGreeterServer$Impl$1.sayHello(LoggingErrorHandlingGreeterServer.scala:43)
at example.myapp.helloworld.grpc.GreeterServiceHandler$.$anonfun$partial$2(GreeterServiceHandler.scala:118)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[INFO] [05/15/2022 09:24:36.905] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Chunked(application/grpc+proto),HttpProtocol(HTTP/1.1)))
For the empty name, the server prints a slightly different error log and the response log,
[ERROR] [05/15/2022 09:24:36.914] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] Grpc failure UNHANDLED and mapped to org.apache.pekko.grpc.Trailers@5e1d9001
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.IterableLike.head(IterableLike.scala:109)
at scala.collection.IterableLike.head$(IterableLike.scala:108)
at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:33)
at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
at scala.collection.immutable.StringOps.head(StringOps.scala:33)
at example.myapp.helloworld.LoggingErrorHandlingGreeterServer$Impl$1.sayHello(LoggingErrorHandlingGreeterServer.scala:42)
at example.myapp.helloworld.grpc.GreeterServiceHandler$.$anonfun$partial$2(GreeterServiceHandler.scala:118)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[INFO] [05/15/2022 09:24:36.914] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Chunked(application/grpc+proto),HttpProtocol(HTTP/1.1)))
Future work
For in-depth pekko-grpc/pekko-http integration, we currently need to pass information from the Pekko HTTP route into the service implementation constructor, and construct a new Handler for each request. This pattern is shown in an example above.
In the future we plan to provide a nicer API for this, for example we could pass the Pekko HTTP attributes (introduced in Akka HTTP 10.2.0) as Metadata when using the PowerApi.