Walkthrough
Setting up
To get started, you must obtain or write the .proto
file(s) that describe the interface you want to implement and add those files to your project. Add .proto
files to your project’s src/main/protobuf
src/main/proto
src/main/proto
directory. (See the detailed chapters on sbt, Gradle and Maven for information on taking .proto definitions from dependencies)
Then add the Pekko gRPC plugin to your build:
- sbt
-
// in project/plugins.sbt: addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.1.0") // // in build.sbt: enablePlugins(PekkoGrpcPlugin)
- Gradle
-
buildscript { repositories { mavenLocal() gradlePluginPortal() mavenCentral() } dependencies { // see https://plugins.gradle.org/plugin/org.apache.pekko.grpc.gradle // for the currently latest version. classpath 'gradle.plugin.org.apache.pekko:pekko-grpc-gradle-plugin:1.1.0' } } plugins { id 'java' id 'application' } apply plugin: 'org.apache.pekko.grpc.gradle' repositories { mavenLocal() mavenCentral() }
- Maven
-
<project> <modelVersion>4.0.0</modelVersion> <name>Project name</name> <groupId>com.example</groupId> <artifactId>my-grpc-app</artifactId> <version>0.1-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <pekko.grpc.version>1.1.0</pekko.grpc.version> <grpc.version>1.68.0</grpc.version> <project.encoding>UTF-8</project.encoding> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-grpc-runtime_2.12</artifactId> <version>${pekko.grpc.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.pekko</groupId> <artifactId>pekko-grpc-maven-plugin</artifactId> <version>${pekko.grpc.version}</version> <executions> <execution> <goals> <goal>generate</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
For a complete overview of the configuration options see the chapter for your build tool, sbt, Gradle or Maven.
Dependencies
The Pekko gRPC plugin makes your code depend on the pekko-grpc-runtime
library.
The table below shows direct dependencies of it and the second tab shows all libraries it depends on transitively. Be aware that the io.grpc.grpc-api
library depends on Guava.
Writing a service definition
Define the interfaces you want to implement in your project’s src/main/protobuf
src/main/proto
src/main/proto
file(s).
For example, this is the definition of a Hello World service:
sourcesyntax = "proto3";
import "google/protobuf/timestamp.proto";
option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";
package helloworld;
////////////////////////////////////// The greeting service definition.
service GreeterService {
//////////////////////
// Sends a greeting //
////////*****/////////
// HELLO //
////////*****/////////
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Comment spanning
// on several lines
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
/*
* C style comments
*/
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
/* C style comments
* on several lines
* with non-empty heading/trailing line */
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
google.protobuf.Timestamp timestamp = 2;
}
Generating interfaces and stubs
Start by generating code from the .proto
definition with:
- sbt
-
sbt compile
- Gradle
-
./gradlew build
- Maven
-
mvn pekko-grpc:generate
From the above definition, Pekko gRPC generates interfaces that look like this:
- Scala
-
source
// Generated by Pekko gRPC. DO NOT EDIT. package example.myapp.helloworld.grpc import org.apache.pekko import pekko.annotation.ApiMayChange import pekko.grpc.PekkoGrpcGenerated /** * #services * //////////////////////////////////// The greeting service definition. */ @PekkoGrpcGenerated trait GreeterService { /** * //////////////////// * Sends a greeting // * //////*****///////// * HELLO // * //////*****///////// */ def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply] /** * Comment spanning * on several lines */ def itKeepsTalking(in: org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed]): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply] /** * C style comments */ def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed] /** * C style comments * on several lines * with non-empty heading/trailing line */ def streamHellos(in: org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed]): org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed] } @PekkoGrpcGenerated object GreeterService extends pekko.grpc.ServiceDescription { val name = "helloworld.GreeterService" val descriptor: com.google.protobuf.Descriptors.FileDescriptor = example.myapp.helloworld.grpc.HelloworldProto.javaDescriptor; object Serializers { import pekko.grpc.scaladsl.ScalapbProtobufSerializer val HelloRequestSerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion) val HelloReplySerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion) } @ApiMayChange @PekkoGrpcGenerated object MethodDescriptors { import pekko.grpc.internal.Marshaller import io.grpc.MethodDescriptor import Serializers._ val sayHelloDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.UNARY ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "SayHello")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() val itKeepsTalkingDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.CLIENT_STREAMING ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "ItKeepsTalking")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() val itKeepsReplyingDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.SERVER_STREAMING ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "ItKeepsReplying")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() val streamHellosDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.BIDI_STREAMING ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "StreamHellos")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() } } - Java
-
source
// Generated by Pekko gRPC. DO NOT EDIT. package example.myapp.helloworld.grpc; import org.apache.pekko.grpc.ProtobufSerializer; import org.apache.pekko.grpc.javadsl.GoogleProtobufSerializer; import org.apache.pekko.grpc.PekkoGrpcGenerated; /** * //////////////////////////////////// The greeting service definition. */ public interface GreeterService { /** * //////////////////// * Sends a greeting // * //////*****///////// * HELLO // * //////*****///////// */ java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> sayHello(example.myapp.helloworld.grpc.HelloRequest in); /** * Comment spanning * on several lines */ java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> itKeepsTalking(org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed> in); /** * C style comments */ org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed> itKeepsReplying(example.myapp.helloworld.grpc.HelloRequest in); /** * C style comments * on several lines * with non-empty heading/trailing line */ org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed> streamHellos(org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed> in); static String name = "helloworld.GreeterService"; static org.apache.pekko.grpc.ServiceDescription description = new org.apache.pekko.grpc.internal.ServiceDescriptionImpl(name, HelloWorldProto.getDescriptor()); @PekkoGrpcGenerated public static class Serializers { public static ProtobufSerializer<example.myapp.helloworld.grpc.HelloRequest> HelloRequestSerializer = new GoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloRequest.parser()); public static ProtobufSerializer<example.myapp.helloworld.grpc.HelloReply> HelloReplySerializer = new GoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloReply.parser()); } }
and model case classes for HelloRequest
and HelloResponse
.
The service interface is the same for the client and the server side. On the server side, the service implements the interface, on the client side the Pekko gRPC infrastructure implements a stub that will connect to the remote service when called.
There are 4 different types of calls:
- unary call - single request that returns a
Future
CompletionStage
with a single response, seesayHello
in above example - client streaming call -
Source
(stream) of requests from the client that returns aFuture
CompletionStage
with a single response, seeitKeepsTalking
in above example - server streaming call - single request that returns a
Source
(stream) of responses, seeitKeepsReplying
in above example - client and server streaming call -
Source
(stream) of requests from the client that returns aSource
(stream) of responses, seestreamHellos
in above example
Implementing the service
Let’s implement these 4 calls in a new class:
- Scala
-
source
package example.myapp.helloworld import scala.concurrent.Future import org.apache.pekko import pekko.NotUsed import pekko.stream.Materializer import pekko.stream.scaladsl.Sink import pekko.stream.scaladsl.Source import com.google.protobuf.timestamp.Timestamp import example.myapp.helloworld.grpc._ class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService { import mat.executionContext override def sayHello(in: HelloRequest): Future[HelloReply] = { println(s"sayHello to ${in.name}") Future.successful(HelloReply(s"Hello, ${in.name}", Some(Timestamp.apply(123456, 123)))) } override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = { println(s"sayHello to in stream...") in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}")) } override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = { println(s"sayHello to ${in.name} with stream of chars...") Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString)) } override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { println(s"sayHello to stream...") in.map(request => HelloReply(s"Hello, ${request.name}")) } }
- Java
-
source
package example.myapp.helloworld; import com.google.protobuf.Timestamp; import example.myapp.helloworld.grpc.*; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import org.apache.pekko.NotUsed; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; public class GreeterServiceImpl implements GreeterService { private final Materializer mat; public GreeterServiceImpl(Materializer mat) { this.mat = mat; } @Override public CompletionStage<HelloReply> sayHello(HelloRequest in) { System.out.println("sayHello to " + in.getName()); HelloReply reply = HelloReply.newBuilder() .setMessage("Hello, " + in.getName()) .setTimestamp(Timestamp.newBuilder().setSeconds(1234567890).setNanos(12345).build()) .build(); return CompletableFuture.completedFuture(reply); } @Override public CompletionStage<HelloReply> itKeepsTalking(Source<HelloRequest, NotUsed> in) { System.out.println("sayHello to in stream..."); return in.runWith(Sink.<HelloRequest>seq(), mat) .thenApply( elements -> { String elementsStr = elements.stream() .map(HelloRequest::getName) .collect(Collectors.toList()) .toString(); return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build(); }); } @Override public Source<HelloReply, NotUsed> itKeepsReplying(HelloRequest in) { System.out.println("sayHello to " + in.getName() + " with stream of chars"); List<Character> characters = ("Hello, " + in.getName()).chars().mapToObj(c -> (char) c).collect(Collectors.toList()); return Source.from(characters) .map(character -> HelloReply.newBuilder().setMessage(String.valueOf(character)).build()); } @Override public Source<HelloReply, NotUsed> streamHellos(Source<HelloRequest, NotUsed> in) { System.out.println("sayHello to stream..."); return in.map( request -> HelloReply.newBuilder().setMessage("Hello, " + request.getName()).build()); } }
Serving the service with Pekko HTTP
Note, how the implementation we just wrote is free from any gRPC related boilerplate. It only uses the generated model and interfaces from your domain and basic Pekko streams classes. We now need to connect this implementation class to the web server to offer it to clients.
Pekko gRPC servers are implemented with Pekko HTTP. In addition to the above GreeterService
, a GreeterServiceHandler
GreeterServiceHandlerFactory
was generated that wraps the implementation with the gRPC functionality to be plugged into an existing Pekko HTTP server app.
You create the request handler by calling GreeterServiceHandler(yourImpl)
GreeterServiceHandlerFactory.create(yourImpl, ...)
.
The server will reuse the given instance of the implementation, which means that it is shared between (potentially concurrent) requests. Make sure that the implementation is thread-safe. In the sample above there is no mutable state, so it is safe. For more information about safely implementing servers with state see the advice about stateful below.
A complete main program that starts a Pekko HTTP server with the GreeterService
looks like this:
- Scala
-
source
package example.myapp.helloworld import org.apache.pekko import pekko.actor.ActorSystem import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse } import pekko.http.scaladsl.Http import com.typesafe.config.ConfigFactory import example.myapp.helloworld.grpc._ import scala.concurrent.{ ExecutionContext, Future } object GreeterServer { def main(args: Array[String]): Unit = { // Important: enable HTTP/2 in ActorSystem's config // We do it here programmatically, but you can also set it in the application.conf val conf = ConfigFactory .parseString("pekko.http.server.preview.enable-http2 = on") .withFallback(ConfigFactory.defaultApplication()) val system = ActorSystem("HelloWorld", conf) new GreeterServer(system).run() // ActorSystem threads will keep the app alive until `system.terminate()` is called } } class GreeterServer(system: ActorSystem) { def run(): Future[Http.ServerBinding] = { // Pekko boot up code implicit val sys: ActorSystem = system implicit val ec: ExecutionContext = sys.dispatcher // Create service handlers val service: HttpRequest => Future[HttpResponse] = GreeterServiceHandler(new GreeterServiceImpl()) // Bind service handler servers to localhost:8080/8081 val binding = Http().newServerAt("127.0.0.1", 8080).bind(service) // report successful binding binding.foreach { binding => println(s"gRPC server bound to: ${binding.localAddress}") } binding } }
- Java
-
source
package example.myapp.helloworld; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import example.myapp.helloworld.grpc.GreeterService; import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory; import java.util.concurrent.CompletionStage; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.ServerBinding; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.SystemMaterializer; class GreeterServer { public static void main(String[] args) throws Exception { // important to enable HTTP/2 in ActorSystem's config Config conf = ConfigFactory.parseString("pekko.http.server.preview.enable-http2 = on") .withFallback(ConfigFactory.defaultApplication()); // ActorSystem Boot ActorSystem sys = ActorSystem.create("HelloWorld", conf); run(sys) .thenAccept( binding -> System.out.println("gRPC server bound to: " + binding.localAddress())); // ActorSystem threads will keep the app alive until `system.terminate()` is called } public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Exception { Materializer mat = SystemMaterializer.get(sys).materializer(); // Instantiate implementation GreeterService impl = new GreeterServiceImpl(mat); return Http.get(sys) .newServerAt("127.0.0.1", 8090) .bind(GreeterServiceHandlerFactory.create(impl, sys)); } }
It’s important to enable HTTP/2 in Pekko HTTP in the configuration of the ActorSystem
by setting
pekko.http.server.preview.enable-http2 = on
In the example this was done from the main
method, but you could also do this from within your application.conf
.
The above example does not use TLS. Find more about how to Serve gRPC over TLS on the deployment section.
Serving multiple services
When a server handles several services the handlers must be combined with org.apache.pekko.grpc.scaladsl.ServiceHandler.concatOrNotFound
org.apache.pekko.grpc.javadsl.ServiceHandler.concatOrNotFound
:
- Scala
-
source
import org.apache.pekko.grpc.scaladsl.ServiceHandler // explicit types not needed but included in example for clarity val greeterService: PartialFunction[HttpRequest, Future[HttpResponse]] = example.myapp.helloworld.grpc.GreeterServiceHandler.partial(new GreeterServiceImpl()) val echoService: PartialFunction[HttpRequest, Future[HttpResponse]] = EchoServiceHandler.partial(new EchoServiceImpl) val reflectionService = ServerReflection.partial(List(GreeterService, EchoService)) val serviceHandlers: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound(greeterService, echoService, reflectionService) Http() .newServerAt("127.0.0.1", 8080) .bind(serviceHandlers)
- Java
-
source
import org.apache.pekko.grpc.javadsl.ServiceHandler; import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; import org.apache.pekko.japi.function.Function; Function<HttpRequest, CompletionStage<HttpResponse>> greeterService = GreeterServiceHandlerFactory.create(new GreeterServiceImpl(mat), sys); Function<HttpRequest, CompletionStage<HttpResponse>> echoService = EchoServiceHandlerFactory.create(new EchoServiceImpl(), sys); @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> serviceHandlers = ServiceHandler.concatOrNotFound(greeterService, echoService); Http.get(sys) .newServerAt("127.0.0.1", 8090) .bind(serviceHandlers)
Note that GreeterServiceHandler.partial
and EchoServiceHandler.partial
are used instead of apply
methods to create partial functions that are combined by concatOrNotFound
.
Running the server
To run the server with HTTP/2 using HTTPS on a JVM prior to version 1.8.0_251, you will likely have to configure the Jetty ALPN agent as described in the Pekko HTTP documentation. Later JVM versions have this support built-in.
See the detailed chapters on sbt, Gradle and Maven for details on adding the agent.
Stateful services
More often than not, the whole point of the implementing a service is to keep state. Since the service implementation is shared between concurrent incoming requests any state must be thread safe.
There are two recommended ways to deal with this:
- Put the mutable state inside an actor and interact with it through
ask
from unary methods orFlow.ask
from streams. - Keep the state in a thread-safe place. For example, a CRUD application that is backed by a database is thread-safe when access to the backing database is (which until recently was THE way that applications dealt with request concurrency).
This is an example based on the Hello World above, but allowing users to change the greeting through a unary call:
- Scala
-
source
class GreeterServiceImpl(system: ActorSystem) extends GreeterService { val greeterActor = system.actorOf(GreeterActor.props("Hello"), "greeter") def sayHello(in: HelloRequest): Future[HelloReply] = { // timeout and execution context for ask implicit val timeout: Timeout = 3.seconds import system.dispatcher (greeterActor ? GreeterActor.GetGreeting) .mapTo[GreeterActor.Greeting] .map(message => HelloReply(s"${message.greeting}, ${in.name}")) } def changeGreeting(in: ChangeRequest): Future[ChangeResponse] = { greeterActor ! GreeterActor.ChangeGreeting(in.newGreeting) Future.successful(ChangeResponse()) } }
- Java
-
source
public final class GreeterServiceImpl implements GreeterService { private final ActorSystem system; private final ActorRef greeterActor; public GreeterServiceImpl(ActorSystem system) { this.system = system; this.greeterActor = system.actorOf(GreeterActor.props("Hello"), "greeter"); } public CompletionStage<HelloReply> sayHello(HelloRequest in) { return ask(greeterActor, GreeterActor.GET_GREETING, Duration.ofSeconds(5)) .thenApply( message -> HelloReply.newBuilder() .setMessage(((GreeterActor.Greeting) message).greeting) .build()); } public CompletionStage<ChangeResponse> changeGreeting(ChangeRequest in) { greeterActor.tell(new GreeterActor.ChangeGreeting(in.getNewGreeting()), ActorRef.noSender()); return CompletableFuture.completedFuture(ChangeResponse.newBuilder().build()); } }
The GreeterActor
is implemented like this:
- Scala
-
source
object GreeterActor { case class ChangeGreeting(newGreeting: String) case object GetGreeting case class Greeting(greeting: String) def props(initialGreeting: String) = Props(new GreeterActor(initialGreeting)) } class GreeterActor(initialGreeting: String) extends Actor { import GreeterActor._ var greeting = Greeting(initialGreeting) def receive = { case GetGreeting => sender() ! greeting case ChangeGreeting(newGreeting) => greeting = Greeting(newGreeting) } }
- Java
-
source
public class GreeterActor extends AbstractActor { public static class ChangeGreeting { public final String newGreeting; public ChangeGreeting(String newGreeting) { this.newGreeting = newGreeting; } } public static class GetGreeting {} public static GetGreeting GET_GREETING = new GetGreeting(); public static class Greeting { public final String greeting; public Greeting(String greeting) { this.greeting = greeting; } } public static Props props(final String initialGreeting) { return Props.create(GreeterActor.class, () -> new GreeterActor(initialGreeting)); } private Greeting greeting; public GreeterActor(String initialGreeting) { greeting = new Greeting(initialGreeting); } public AbstractActor.Receive createReceive() { return receiveBuilder() .match(GetGreeting.class, this::onGetGreeting) .match(ChangeGreeting.class, this::onChangeGreeting) .build(); } private void onGetGreeting(GetGreeting get) { getSender().tell(greeting, getSelf()); } private void onChangeGreeting(ChangeGreeting change) { greeting = new Greeting(change.newGreeting); } }
Now the actor mailbox is used to synchronize accesses to the mutable state.