Walkthrough
Setting up¶
To get started, you must obtain the .proto
file(s) that describe the interface you want to use and add those files to your project.
Add .proto
files to your project’s src/main/protobuf
directory. See the detailed chapters on sbt, Gradle and Maven for information on picking up .proto
definitions from dependencies automatically.
Then add the following configuration to your build:
// in project/plugins.sbt:
addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.1.1")
// in build.sbt:
enablePlugins(PekkoGrpcPlugin)
buildscript {
dependencies {
// version here is a placeholder,
// it is replaced with a project dependency during integration tests
// by adding --include-build <path> to gradlew
classpath 'org.apache.pekko:pekko-grpc-gradle-plugin:1.1.1'
}
}
plugins {
id 'java'
id 'application'
}
apply plugin: 'org.apache.pekko.grpc.gradle'
<project>
<modelVersion>4.0.0</modelVersion>
<name>Project name</name>
<groupId>com.example</groupId>
<artifactId>my-grpc-app</artifactId>
<version>0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<pekko.grpc.version>1.1.1</pekko.grpc.version>
<grpc.version>1.67.1</grpc.version>
<project.encoding>UTF-8</project.encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-grpc-runtime_2.12</artifactId>
<version>${pekko.grpc.version}</version>
</dependency>
<!-- for loading of cert, issue #89 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-grpc-maven-plugin</artifactId>
<version>${pekko.grpc.version}</version>
<!-- Hook the generate goal into the lifecycle,
automatically tied to generate-sources -->
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
For a complete overview of the configuration options see the chapter for your build tool, sbt, Gradle or Maven.
Dependencies¶
The Pekko gRPC plugin makes your code depend on the pekko-grpc-runtime
library.
The table below shows direct dependencies of it and the second tab shows all libraries it depends on transitively. Be aware that the io.grpc.grpc-api
library depends on Guava.
Generating Service Stubs¶
To use a service, such as the Hello World service described in the server documentation, you only need the protobuf definition (the .proto
files) of the service. No additional dependencies to the server project are needed.
For example, this is the definition of a Hello World service:
sourcesyntax = "proto3";
option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";
package helloworld;
////////////////////////////////////// The greeting service definition.
service GreeterService {
//////////////////////
// Sends a greeting //
////////*****/////////
// HELLO //
////////*****/////////
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Comment spanning
// on several lines
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
/*
* C style comments
*/
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
/* C style comments
* on several lines
* with non-empty heading/trailing line */
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
From this definition, Pekko gRPC generates interfaces that look like this:
source
// Generated by Pekko gRPC. DO NOT EDIT.
package example.myapp.helloworld.grpc
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.grpc.PekkoGrpcGenerated
/**
* #services
* //////////////////////////////////// The greeting service definition.
*/
@PekkoGrpcGenerated
trait GreeterService {
/**
* ////////////////////
* Sends a greeting //
* //////*****/////////
* HELLO //
* //////*****/////////
*/
def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply]
/**
* Comment spanning
* on several lines
*/
def itKeepsTalking(in: org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed]): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply]
/**
* C style comments
*/
def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed]
/**
* C style comments
* on several lines
* with non-empty heading/trailing line */
def streamHellos(in: org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed]): org.apache.pekko.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed]
}
@PekkoGrpcGenerated
object GreeterService extends pekko.grpc.ServiceDescription {
val name = "helloworld.GreeterService"
val descriptor: com.google.protobuf.Descriptors.FileDescriptor =
example.myapp.helloworld.grpc.HelloworldProto.javaDescriptor;
object Serializers {
import pekko.grpc.scaladsl.ScalapbProtobufSerializer
val HelloRequestSerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion)
val HelloReplySerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion)
}
@ApiMayChange
@PekkoGrpcGenerated
object MethodDescriptors {
import pekko.grpc.internal.Marshaller
import io.grpc.MethodDescriptor
import Serializers._
val sayHelloDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] =
MethodDescriptor.newBuilder()
.setType(
MethodDescriptor.MethodType.UNARY
)
.setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "SayHello"))
.setRequestMarshaller(new Marshaller(HelloRequestSerializer))
.setResponseMarshaller(new Marshaller(HelloReplySerializer))
.setSampledToLocalTracing(true)
.build()
val itKeepsTalkingDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] =
MethodDescriptor.newBuilder()
.setType(
MethodDescriptor.MethodType.CLIENT_STREAMING
)
.setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "ItKeepsTalking"))
.setRequestMarshaller(new Marshaller(HelloRequestSerializer))
.setResponseMarshaller(new Marshaller(HelloReplySerializer))
.setSampledToLocalTracing(true)
.build()
val itKeepsReplyingDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] =
MethodDescriptor.newBuilder()
.setType(
MethodDescriptor.MethodType.SERVER_STREAMING
)
.setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "ItKeepsReplying"))
.setRequestMarshaller(new Marshaller(HelloRequestSerializer))
.setResponseMarshaller(new Marshaller(HelloReplySerializer))
.setSampledToLocalTracing(true)
.build()
val streamHellosDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] =
MethodDescriptor.newBuilder()
.setType(
MethodDescriptor.MethodType.BIDI_STREAMING
)
.setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "StreamHellos"))
.setRequestMarshaller(new Marshaller(HelloRequestSerializer))
.setResponseMarshaller(new Marshaller(HelloReplySerializer))
.setSampledToLocalTracing(true)
.build()
}
}
source
// Generated by Pekko gRPC. DO NOT EDIT.
package example.myapp.helloworld.grpc;
import org.apache.pekko.grpc.ProtobufSerializer;
import org.apache.pekko.grpc.javadsl.GoogleProtobufSerializer;
import org.apache.pekko.grpc.PekkoGrpcGenerated;
/**
* //////////////////////////////////// The greeting service definition.
*/
public interface GreeterService {
/**
* ////////////////////
* Sends a greeting //
* //////*****/////////
* HELLO //
* //////*****/////////
*/
java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> sayHello(example.myapp.helloworld.grpc.HelloRequest in);
/**
* Comment spanning
* on several lines
*/
java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> itKeepsTalking(org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed> in);
/**
* C style comments
*/
org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed> itKeepsReplying(example.myapp.helloworld.grpc.HelloRequest in);
/**
* C style comments
* on several lines
* with non-empty heading/trailing line */
org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, org.apache.pekko.NotUsed> streamHellos(org.apache.pekko.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, org.apache.pekko.NotUsed> in);
static String name = "helloworld.GreeterService";
static org.apache.pekko.grpc.ServiceDescription description = new org.apache.pekko.grpc.internal.ServiceDescriptionImpl(name, HelloWorldProto.getDescriptor());
@PekkoGrpcGenerated
public static class Serializers {
public static ProtobufSerializer<example.myapp.helloworld.grpc.HelloRequest> HelloRequestSerializer = new GoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloRequest.parser());
public static ProtobufSerializer<example.myapp.helloworld.grpc.HelloReply> HelloReplySerializer = new GoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloReply.parser());
}
}
and model HelloRequest
and HelloResponse
.
The service interface is the same for the client and the server side. On the server side, the service implements the interface, on the client side the Pekko gRPC infrastructure implements a stub that will connect to the remote service when called.
There are 4 different types of calls:
- unary call - single request that returns a
CompletionStage
with a single response, seesayHello
in above example - client streaming call -
Source
(stream) of requests from the client that returns aCompletionStage
with a single response, seeitKeepsTalking
in above example - server streaming call - single request that returns a
Source
(stream) of responses, seeitKeepsReplying
in above example - client and server streaming call -
Source
(stream) of requests from the client that returns aSource
(stream) of responses, seestreamHellos
in above example
Writing a Client Program¶
Let’s use these 4 calls from a client. Start by generating code from the .proto
definition with:
compile
./gradlew build
mvn pekko-grpc:generate
A main program that calls the server with the GreeterService
looks like this:
sourcepackage example.myapp.helloworld
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ActorSystem
import pekko.grpc.GrpcClientSettings
import pekko.stream.scaladsl.Source
import example.myapp.helloworld.grpc._
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
object GreeterClient {
def main(args: Array[String]): Unit = {
// Boot pekko
implicit val sys: ActorSystem = ActorSystem("HelloWorldClient")
implicit val ec: ExecutionContext = sys.dispatcher
// Configure the client by code:
val clientSettings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8080).withTls(false)
// Or via application.conf:
// val clientSettings = GrpcClientSettings.fromConfig(GreeterService.name)
// Create a client-side stub for the service
val client: GreeterService = GreeterServiceClient(clientSettings)
// Run examples for each of the exposed service methods.
runSingleRequestReplyExample()
runStreamingRequestExample()
runStreamingReplyExample()
runStreamingRequestReplyExample()
sys.scheduler.scheduleWithFixedDelay(1.second, 1.second) { () => runSingleRequestReplyExample() }
def runSingleRequestReplyExample(): Unit = {
sys.log.info("Performing request")
val reply = client.sayHello(HelloRequest("Alice"))
reply.onComplete {
case Success(msg) =>
println(s"got single reply: $msg")
case Failure(e) =>
println(s"Error sayHello: $e")
}
}
def runStreamingRequestExample(): Unit = {
val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_))
val reply = client.itKeepsTalking(Source(requests))
reply.onComplete {
case Success(msg) =>
println(s"got single reply for streaming requests: $msg")
case Failure(e) =>
println(s"Error streamingRequest: $e")
}
}
def runStreamingReplyExample(): Unit = {
val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
val done: Future[Done] =
responseStream.runForeach(reply => println(s"got streaming reply: ${reply.message}"))
done.onComplete {
case Success(_) =>
println("streamingReply done")
case Failure(e) =>
println(s"Error streamingReply: $e")
}
}
def runStreamingRequestReplyExample(): Unit = {
val requestStream: Source[HelloRequest, NotUsed] =
Source
.tick(100.millis, 1.second, "tick")
.zipWithIndex
.map { case (_, i) => i }
.map(i => HelloRequest(s"Alice-$i"))
.take(10)
.mapMaterializedValue(_ => NotUsed)
val responseStream: Source[HelloReply, NotUsed] = client.streamHellos(requestStream)
val done: Future[Done] =
responseStream.runForeach(reply => println(s"got streaming reply: ${reply.message}"))
done.onComplete {
case Success(_) =>
println("streamingRequestReply done")
case Failure(e) =>
println(s"Error streamingRequestReply: $e")
}
}
}
}
sourcepackage example.myapp.helloworld;
import example.myapp.helloworld.grpc.*;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.grpc.GrpcClientSettings;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.stream.javadsl.Source;
class GreeterClient {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("HelloWorldClient");
Materializer materializer = SystemMaterializer.get(system).materializer();
// Configure the client by code:
GrpcClientSettings settings =
GrpcClientSettings.connectToServiceAt("127.0.0.1", 8090, system).withTls(false);
// Or via application.conf:
// GrpcClientSettings settings = GrpcClientSettings.fromConfig(GreeterService.name, system);
GreeterServiceClient client = null;
try {
client = GreeterServiceClient.create(settings, system);
singleRequestReply(client);
streamingRequest(client);
streamingReply(client, materializer);
streamingRequestReply(client, materializer);
} catch (StatusRuntimeException e) {
System.out.println("Status: " + e.getStatus());
} catch (Exception e) {
System.out.println(e);
} finally {
if (client != null) client.close();
system.terminate();
}
}
private static void singleRequestReply(GreeterService client) throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Alice").build();
CompletionStage<HelloReply> reply = client.sayHello(request);
System.out.println("got single reply: " + reply.toCompletableFuture().get(5, TimeUnit.SECONDS));
}
private static void streamingRequest(GreeterService client) throws Exception {
List<HelloRequest> requests =
Stream.of("Alice", "Bob", "Peter")
.map(name -> HelloRequest.newBuilder().setName(name).build())
.collect(Collectors.toList());
CompletionStage<HelloReply> reply = client.itKeepsTalking(Source.from(requests));
System.out.println(
"got single reply for streaming requests: "
+ reply.toCompletableFuture().get(5, TimeUnit.SECONDS));
}
private static void streamingReply(GreeterService client, Materializer mat) throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Alice").build();
Source<HelloReply, NotUsed> responseStream = client.itKeepsReplying(request);
CompletionStage<Done> done =
responseStream.runForeach(
reply -> System.out.println("got streaming reply: " + reply.getMessage()), mat);
done.toCompletableFuture().get(60, TimeUnit.SECONDS);
}
private static void streamingRequestReply(GreeterService client, Materializer mat)
throws Exception {
Duration interval = Duration.ofSeconds(1);
Source<HelloRequest, NotUsed> requestStream =
Source.tick(interval, interval, "tick")
.zipWithIndex()
.map(Pair::second)
.map(i -> HelloRequest.newBuilder().setName("Alice-" + i).build())
.take(10)
.mapMaterializedValue(m -> NotUsed.getInstance());
Source<HelloReply, NotUsed> responseStream = client.streamHellos(requestStream);
CompletionStage<Done> done =
responseStream.runForeach(
reply -> System.out.println("got streaming reply: " + reply.getMessage()), mat);
done.toCompletableFuture().get(60, TimeUnit.SECONDS);
}
}
You can run the example with
runMain io.grpc.examples.helloworld.GreeterClient
./gradlew run
mvn pekko-grpc:generate compile exec:java -Dexec.mainClass=io.grpc.examples.helloworld.GreeterClient