Pekko HTTP interop

Pekko gRPC is built on top of Pekko HTTP. This means it is possible to leverage the Pekko HTTP API’s to create more complicated services, for example serving non-gRPC endpoints next to gRPC endpoints or adding additional behavior around your gRPC routes.

Example: authentication/authorization

One use case could be adding cross-cutting concerns such as authentication/authorization. Suppose you have an API that you want to secure using a token. You already have a regular HTTP API that users can use to obtain a token, and want to secure your gRPC routes to only accept calls that include this token.

Pekko HTTP authentication route

This route could be any arbitrary Pekko HTTP route. For this example we just provide a hint in the response body:

Scala
source// A Route to authenticate with
val authenticationRoute: Route = path("login") {
  get {
    complete("Psst, please use token XYZ!")
  }
}
Java
source// A Route to authenticate with
Route authentication = path("login", () ->
  get(() ->
    complete("Psst, please use token XYZ!")
  )
);

Pekko gRPC route

We create the Pekko gRPC service implementation, and convert it to a RouteRoute as well:

Scala
source// Create service handlers
val handler: HttpRequest => Future[HttpResponse] =
  GreeterServiceHandler(new GreeterServiceImpl())

// As a Route
val handlerRoute: Route = handle(handler)
Java
source// Instantiate implementation
GreeterService impl = new GreeterServiceImpl(mat);
Function<HttpRequest, CompletionStage<HttpResponse>> handler = GreeterServiceHandlerFactory.create(impl, sys);

// As a Route
Route handlerRoute = handle(handler);

Securing the Pekko gRPC route

We can wrap the gRPC route just like any RouteRoute, applying the authorization:

Scala
source// A directive to authorize calls
val authorizationDirective: Directive0 =
  headerValueByName("token").flatMap { token =>
    if (token == "XYZ") pass
    else reject
  }
Java
source// Protect the handler route
Route protectedHandler =
  headerValueByName("token", token -> {
    if ("XYZ".equals(token)) {
      return handlerRoute;
    } else {
      return complete(StatusCodes.UNAUTHORIZED);
    }
  });

Tying it all together

Finally we can combine the routes and serve them. Remember we need to use bindAndHandleAsync to enable HTTP/2 support:

Scala
sourceval route = concat(
  authenticationRoute,
  authorizationDirective {
    handlerRoute
  })

// Bind service handler servers to localhost:8082
val binding = Http().newServerAt("127.0.0.1", 8082).bind(route)
Java
sourceRoute finalRoute = concat(
  authentication,
  protectedHandler
);

return Http.get(sys)
  .newServerAt("127.0.0.1", 8090)
  .bind(finalRoute);

Example: logging, error handling, and passing request context

This example shows how we can wrap a gRPC service implementation into a RouteRoute to get the following common server features:

  1. Log the HTTP request and response corresponding to each RPC.
  2. Pass the RequestContextRequestContext into the RPC handler.
  3. If the RPC fails, apply a custom error handler and log the error.

Implementation

We start with an implementation of the sayHello RPC that does some primitive validation on the name.
If the name starts with a lowercase later, we return an IllegalArgumentException. Otherwise, we return a standard response.

However, there’s also a slight bug in the implementation. If the name is empty, the call to get the first character in the first if statement will throw an Exception.

Scala
sourceprivate final class Impl(mat: Materializer) extends GreeterServiceImpl()(mat) {
  override def sayHello(in: HelloRequest): Future[HelloReply] =
    if (in.name.head.isLower) {
      Future.failed(new IllegalArgumentException("Name must be capitalized"))
    } else {
      Future.successful(HelloReply(s"Hello, ${in.name}"))
    }
}
Java
sourceprivate static class Impl extends GreeterServiceImpl {
  public Impl(Materializer mat) {
    super(mat);
  }
  @Override
  public CompletionStage<HelloReply> sayHello(HelloRequest in) {
    if (Character.isLowerCase(in.getName().charAt(0))) {
      CompletableFuture<HelloReply> reply = new CompletableFuture<>();
      reply.completeExceptionally(new IllegalArgumentException("Name must be capitalized"));
      return reply;
    } else {
      HelloReply reply = HelloReply.newBuilder()
        .setMessage("Hello, " + in.getName())
        .build();
      return CompletableFuture.completedFuture(reply);
    }
  }
}

Method to log, handle, and recover each RPC

We define a method that returns a RouteRoute implementing our logging and error recovery features. The method takes three parameters.

  1. A function that takes a RequestContextRequestContext and returns a service implementation. This gives us the opportunity to use the context in the implementation. If we don’t need it, we can just ignore the context and return a fixed implementation.
  2. A function that takes an ActorSystemActorSystem and returns a partial function from Throwable to gRPC TrailersTrailers.
  3. A function that takes the service implementation and an error handler and returns a request handler (a function from HttpRequestHttpRequest to a FutureCompletionStage of HttpResponseHttpResponse).

The method first uses an existing directive to log requests and results. Then it wraps the given error handler into an error handler that also logs the error. Finally, it calls the given functions to handle incoming requests.

Scala
sourceprivate type ErrorHandler = ActorSystem => PartialFunction[Throwable, Trailers]

private def loggingErrorHandlingGrpcRoute[ServiceImpl](
    buildImpl: RequestContext => ServiceImpl,
    errorHandler: ErrorHandler,
    buildHandler: (ServiceImpl, ErrorHandler) => HttpRequest => Future[HttpResponse]): Route =
  DebuggingDirectives.logRequestResult(("loggingErrorHandlingGrpcRoute", Logging.InfoLevel)) {
    extractRequestContext { ctx =>
      val loggingErrorHandler: ErrorHandler = (sys: ActorSystem) => {
        case NonFatal(t) =>
          val pf = errorHandler(sys)
          if (pf.isDefinedAt(t)) {
            val trailers: Trailers = pf(t)
            ctx.log.error(t, s"Grpc failure handled and mapped to $trailers")
            trailers
          } else {
            val trailers = Trailers(Status.INTERNAL)
            ctx.log.error(t, s"Grpc failure UNHANDLED and mapped to $trailers")
            trailers
          }
      }
      val impl = buildImpl(ctx)
      val handler = buildHandler(impl, loggingErrorHandler)
      handle(handler)
    }
  }
Java
sourceprivate static <ServiceImpl> Route loggingErrorHandlingGrpcRoute(
  Function<RequestContext, ServiceImpl> buildImpl,
  Function<ActorSystem, Function<Throwable, Trailers>> errorHandler,
  BiFunction<ServiceImpl, Function<ActorSystem, Function<Throwable, Trailers>>, org.apache.pekko.japi.function.Function<HttpRequest, CompletionStage<HttpResponse>>> buildHandler
) {
  return logRequest("loggingErrorHandlingGrpcRoute", Logging.InfoLevel(), () -> extractRequestContext(ctx -> {
    Function<ActorSystem, Function<Throwable, Trailers>> loggingErrorHandler = (actorSystem) -> (throwable) -> {
      Function<Throwable, Trailers> function = errorHandler.apply(actorSystem);
      Trailers trailers = function.apply(throwable);
      if (trailers != null) {
        ctx.getLog().error(throwable, "Grpc failure handled and mapped to " + trailers);
        return trailers;
      } else {
        Trailers internal = new Trailers(Status.INTERNAL);
        ctx.getLog().error(throwable, "Grpc failure UNHANDLED and mapped to " + internal);
        return internal;
      }
    };
    try {
      ServiceImpl impl = buildImpl.apply(ctx);
      org.apache.pekko.japi.function.Function<HttpRequest, CompletionStage<HttpResponse>> handler = buildHandler.apply(impl, loggingErrorHandler);
      return handle(handler);
    } catch (Exception e) {
      return failWith(e);
    }
  }));
}

Custom error mapping

We define a partial function to handle the custom exception we defined above.

Scala
sourceprivate val customErrorMapping: PartialFunction[Throwable, Trailers] = {
  case ex: IllegalArgumentException => Trailers(Status.INVALID_ARGUMENT.withDescription(ex.getMessage))
}
Java
sourceprivate final static Function<Throwable, Trailers> customErrorMapping = (throwable) -> {
  if (throwable instanceof IllegalArgumentException) {
    return new Trailers(Status.INVALID_ARGUMENT);
  } else {
    return null;
  }
};

Tying it all together

Finally, we invoke the new method and bind the resulting RouteRoute.

Scala
sourceval route = loggingErrorHandlingGrpcRoute[GreeterService](
  buildImpl = rc => new Impl(rc.materializer),
  buildHandler = (impl, eHandler) =>
    ServiceHandler.concatOrNotFound(
      GreeterServiceHandler.partial(impl, eHandler = eHandler),
      ServerReflection.partial(List(GreeterService))),
  errorHandler = _ => customErrorMapping)

// Bind service handler servers to localhost:8082
val binding = Http().newServerAt("127.0.0.1", 8082).bind(route)
Java
sourceRoute route = loggingErrorHandlingGrpcRoute(
  (rc) -> new Impl(rc.getMaterializer()),
  (actorSystem) -> customErrorMapping,
  (impl, eHandler) -> GreeterServiceHandlerFactory.partial(impl, GreeterService.name, mat, eHandler, sys)
);
return Http.get(sys)
  .newServerAt("127.0.0.1", 8082)
  .bind(route);

Results

We make three calls: one with a valid name, one with a lowercase name, and one with an empty name.

For the valid name, the RPC succeeds and the server prints only the response log:

[INFO] [05/15/2022 09:24:36.850] [Server-pekko.actor.default-dispatcher-8] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
  Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
  Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Strict(application/grpc+proto,42 bytes total),HttpProtocol(HTTP/1.1)))

For the lowercase name, the server prints the error log and the response log. Note that the server still returns a status code 200, even though the RPC failed. This is because gRPC encodes a failure as a successful HTTP response containing the error in the body.

[ERROR] [05/15/2022 09:24:36.902] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] Grpc failure handled and mapped to org.apache.pekko.grpc.Trailers@4ab49ff7
java.lang.IllegalArgumentException: Name must be capitalized
	at example.myapp.helloworld.LoggingErrorHandlingGreeterServer$Impl$1.sayHello(LoggingErrorHandlingGreeterServer.scala:43)
	at example.myapp.helloworld.grpc.GreeterServiceHandler$.$anonfun$partial$2(GreeterServiceHandler.scala:118)
	at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
	at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
	at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
	at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

[INFO] [05/15/2022 09:24:36.905] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
  Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
  Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Chunked(application/grpc+proto),HttpProtocol(HTTP/1.1)))

For the empty name, the server prints a slightly different error log and the response log,

[ERROR] [05/15/2022 09:24:36.914] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] Grpc failure UNHANDLED and mapped to org.apache.pekko.grpc.Trailers@5e1d9001
java.util.NoSuchElementException: next on empty iterator
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
	at scala.collection.IterableLike.head(IterableLike.scala:109)
	at scala.collection.IterableLike.head$(IterableLike.scala:108)
	at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:33)
	at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
	at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
	at scala.collection.immutable.StringOps.head(StringOps.scala:33)
	at example.myapp.helloworld.LoggingErrorHandlingGreeterServer$Impl$1.sayHello(LoggingErrorHandlingGreeterServer.scala:42)
	at example.myapp.helloworld.grpc.GreeterServiceHandler$.$anonfun$partial$2(GreeterServiceHandler.scala:118)
	at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
	at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
	at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
	at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

[INFO] [05/15/2022 09:24:36.914] [Server-pekko.actor.default-dispatcher-5] [org.apache.pekko.actor.ActorSystemImpl(Server)] loggingErrorHandlingGrpcRoute: Response for
  Request : HttpRequest(HttpMethod(POST),http://127.0.0.1/helloworld.GreeterService/SayHello,Vector(TE: trailers, User-Agent: grpc-java-netty/1.45.1, grpc-accept-encoding: gzip),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0))
  Response: Complete(HttpResponse(200 OK,List(grpc-encoding: gzip),HttpEntity.Chunked(application/grpc+proto),HttpProtocol(HTTP/1.1)))

Future work

For in-depth pekko-grpc/pekko-http integration we currently need to pass information from the Pekko HTTP route into the service implementation constructor, and construct a new Handler for each request. This pattern is shown in an example above.

In the future we plan to provide a nicer API for this, for example we could pass the Pekko HTTP attributes (introduced in 10.2.0) as Metadata when using the PowerApi.