Details
Accessing request metadata¶
By default the generated service interfaces don’t provide access to the request metadata, only to the request body (via the RPC method input parameter). If your methods require access to the request Metadata
, you can configure Pekko gRPC to generate server “power APIs” that extend the base service interfaces to provide an additional request metadata parameter to each service method. See the detailed chapters on sbt, Gradle and Maven for how to set this build option. Note that this option doesn’t effect the generated client stubs.
Notice: you need to change GreeterServiceHandler
to GreeterServicePowerApiHandler
.
Here’s an example implementation of these server power APIs:
sourcepackage example.myapp.helloworld
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.grpc.scaladsl.Metadata
import pekko.stream.scaladsl.{ Sink, Source }
import example.myapp.helloworld.grpc._
import scala.concurrent.Future
class PowerGreeterServiceImpl(implicit system: ActorSystem) extends GreeterServicePowerApi {
import system.dispatcher
override def sayHello(in: HelloRequest, metadata: Metadata): Future[HelloReply] = {
val greetee = authTaggedName(in, metadata)
println(s"sayHello to $greetee")
Future.successful(HelloReply(s"Hello, $greetee"))
}
override def itKeepsTalking(in: Source[HelloRequest, NotUsed], metadata: Metadata): Future[HelloReply] = {
println(s"sayHello to in stream...")
in.runWith(Sink.seq)
.map(elements => HelloReply(s"Hello, ${elements.map(authTaggedName(_, metadata)).mkString(", ")}"))
}
override def itKeepsReplying(in: HelloRequest, metadata: Metadata): Source[HelloReply, NotUsed] = {
val greetee = authTaggedName(in, metadata)
println(s"sayHello to $greetee with stream of chars...")
Source(s"Hello, $greetee".toList).map(character => HelloReply(character.toString))
}
override def streamHellos(in: Source[HelloRequest, NotUsed], metadata: Metadata): Source[HelloReply, NotUsed] = {
println(s"sayHello to stream...")
in.map(request => HelloReply(s"Hello, ${authTaggedName(request, metadata)}"))
}
// Bare-bones just for GRPC metadata demonstration purposes
private def isAuthenticated(metadata: Metadata): Boolean =
metadata.getText("authorization").nonEmpty
private def authTaggedName(in: HelloRequest, metadata: Metadata): String = {
val authenticated = isAuthenticated(metadata)
s"${in.name} (${if (!authenticated) "not " else ""}authenticated)"
}
}
sourcepackage example.myapp.helloworld;
import example.myapp.helloworld.grpc.GreeterServicePowerApi;
import example.myapp.helloworld.grpc.HelloReply;
import example.myapp.helloworld.grpc.HelloRequest;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.pekko.NotUsed;
import org.apache.pekko.grpc.javadsl.Metadata;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
public class PowerGreeterServiceImpl implements GreeterServicePowerApi {
private final Materializer mat;
public PowerGreeterServiceImpl(Materializer mat) {
this.mat = mat;
}
@Override
public CompletionStage<HelloReply> sayHello(HelloRequest in, Metadata metadata) {
String greetee = authTaggedName(in, metadata);
System.out.println("sayHello to " + greetee);
HelloReply reply = HelloReply.newBuilder().setMessage("Hello, " + greetee).build();
return CompletableFuture.completedFuture(reply);
}
@Override
public CompletionStage<HelloReply> itKeepsTalking(
Source<HelloRequest, NotUsed> in, Metadata metadata) {
System.out.println("sayHello to in stream...");
return in.runWith(Sink.<HelloRequest>seq(), mat)
.thenApply(
elements -> {
String elementsStr =
elements.stream()
.map(elem -> authTaggedName(elem, metadata))
.collect(Collectors.toList())
.toString();
return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build();
});
}
@Override
public Source<HelloReply, NotUsed> itKeepsReplying(HelloRequest in, Metadata metadata) {
String greetee = authTaggedName(in, metadata);
System.out.println("sayHello to " + greetee + " with stream of chars");
List<Character> characters =
("Hello, " + greetee).chars().mapToObj(c -> (char) c).collect(Collectors.toList());
return Source.from(characters)
.map(character -> HelloReply.newBuilder().setMessage(String.valueOf(character)).build());
}
@Override
public Source<HelloReply, NotUsed> streamHellos(
Source<HelloRequest, NotUsed> in, Metadata metadata) {
System.out.println("sayHello to stream...");
return in.map(
request ->
HelloReply.newBuilder()
.setMessage("Hello, " + authTaggedName(request, metadata))
.build());
}
// Bare-bones just for GRPC metadata demonstration purposes
private boolean isAuthenticated(Metadata metadata) {
return metadata.getText("authorization").isPresent();
}
private String authTaggedName(HelloRequest in, Metadata metadata) {
return String.format(
"%s (%sauthenticated)", in.getName(), isAuthenticated(metadata) ? "" : "not ");
}
}
Status codes¶
To signal an error, you can fail the Future
or Source
you are returning with a GrpcServiceException
containing the status code you want to return.
For an overview of gRPC status codes and their meaning see statuscodes.md.
For unary responses:
sourceimport pekko.grpc.GrpcServiceException
import io.grpc.Status
val exceptionMetadata = new MetadataBuilder()
.addText("test-text", "test-text-data")
.addBinary("test-binary-bin", ByteString("test-binary-data"))
.build()
// ...
def sayHello(in: HelloRequest): Future[HelloReply] = {
if (in.name.isEmpty)
Future.failed(
new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found"), exceptionMetadata))
else
Future.successful(HelloReply(s"Hi ${in.name}!"))
}
sourceimport java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.grpc.Status;
import org.apache.pekko.grpc.GrpcServiceException;
// ...
@Override
public CompletionStage<HelloReply> sayHello(HelloRequest in) {
if (in.getName().isEmpty()) {
CompletableFuture<HelloReply> future = new CompletableFuture<>();
future.completeExceptionally(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found")));
return future;
} else {
return CompletableFuture.completedFuture(HelloReply.newBuilder().setMessage("Hi, " + in.getName()).build());
}
}
For streaming responses:
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.grpc.GrpcServiceException
import io.grpc.Status
val exceptionMetadata = new MetadataBuilder()
.addText("test-text", "test-text-data")
.addBinary("test-binary-bin", ByteString("test-binary-data"))
.build()
def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
if (in.name.isEmpty)
Source.failed(
new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found"), exceptionMetadata))
else
myResponseSource
}
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
import io.grpc.Status;
import org.apache.pekko.grpc.GrpcServiceException;
// ...
@Override
public Source<HelloReply, NotUsed> itKeepsReplying(HelloRequest in) {
if (in.getName().isEmpty()) {
return Source.failed(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found")));
} else {
return myResponseSource;
}
}
Rich error model¶
Beyond status codes you can also use the Rich error model.
This example uses an error model taken from common protobuf but every class that is based on scalapb.GeneratedMessage
can be used. Build and return the error as a PekkoGrpcException
:
sourcedef sayHello(in: HelloRequest): Future[HelloReply] = {
Future.failed(
GrpcServiceException(
Code.INVALID_ARGUMENT,
"What is wrong?",
Seq(new LocalizedMessage("EN", "The password!"))))
}
source@Override
public CompletionStage<HelloReply> sayHello(HelloRequest in) {
ArrayList<scalapb.GeneratedMessage> ar = new ArrayList<>();
ar.add(LocalizedMessage.of("EN", "The password!"));
GrpcServiceException exception =
GrpcServiceException.apply(
Code.INVALID_ARGUMENT, "What is wrong?", JavaConverters.asScalaBuffer(ar).toSeq());
CompletableFuture<HelloReply> future = new CompletableFuture<>();
future.completeExceptionally(exception);
return future;
}
Please look here how to handle this on the client.