Google Cloud Pub/Sub

Note

Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.

Further information at the official Google Cloud documentation website.

This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com). For a connector that uses gRPC for the communication, take a look at the alternative Apache Pekko Connectors Google Cloud Pub/Sub gRPC connector.

Project Info: Apache Pekko Connectors Google Cloud PubSub
Artifact
org.apache.pekko
pekko-connectors-google-cloud-pub-sub
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.google.cloud.pubsub
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
val PekkoHttpVersion = "1.0.1"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-google-cloud-pub-sub" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
  "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <pekko.http.version>1.0.1</pekko.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-google-cloud-pub-sub_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http-spray-json_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  PekkoHttpVersion: "1.0.1",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-google-cloud-pub-sub_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
  implementation "org.apache.pekko:pekko-http-spray-json_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Usage

The Pub/Sub connector shares its basic configuration with all the Google connectors in Apache Pekko Connectors. Additional Pub/Sub-specific configuration settings can be found in its own reference.conf.

And prepare the actor system.

Scala
sourceimplicit val system: ActorSystem = ActorSystem()
val config = PubSubConfig()
val topic = "topic1"
val subscription = "subscription1"
Java
sourceActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String topic = "topic1";
String subscription = "subscription1";

To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.

Scala
sourceval publishMessage =
  PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(topic, config)

val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
Java
sourcePublishMessage publishMessage =
    PublishMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.create(Lists.newArrayList(publishMessage));

Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

Flow<PublishRequest, List<String>, NotUsed> publishFlow =
    GooglePubSub.publish(topic, config, 1);

CompletionStage<List<List<String>>> publishedMessageIds =
    source.via(publishFlow).runWith(Sink.seq(), system);

To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.

Scala
sourceval messageSource: Source[PublishMessage, NotUsed] = Source(List(publishMessage, publishMessage))
messageSource
  .groupedWithin(1000, 1.minute)
  .map(grouped => PublishRequest(grouped))
  .via(publishFlow)
  .to(Sink.seq)
Java
sourceSource<PublishMessage, NotUsed> messageSource = Source.single(publishMessage);
messageSource
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(messages -> PublishRequest.create(messages))
    .via(publishFlow)
    .runWith(Sink.ignore(), system);

To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest

Scala
sourceval subscriptionSource: Source[ReceivedMessage, Cancellable] =
  GooglePubSub.subscribe(subscription, config)

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(subscription, config)

subscriptionSource
  .map { message =>
    // do something fun

    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)
Java
sourceSource<ReceivedMessage, Cancellable> subscriptionSource =
    GooglePubSub.subscribe(subscription, config);

Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
    GooglePubSub.acknowledge(subscription, config);

subscriptionSource
    .map(
        message -> {
          // do something fun
          return message.ackId();
        })
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(acks -> AcknowledgeRequest.create(acks))
    .to(ackSink);

If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.

Scala
sourceval subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = // ???
val processMessage: Sink[ReceivedMessage, NotUsed] = // ???

val batchAckSink =
  Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)

val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
Java
sourceSink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink;

Sink<ReceivedMessage, NotUsed> batchAckSink =
    Flow.of(ReceivedMessage.class)
        .map(t -> t.ackId())
        .groupedWithin(1000, Duration.ofMinutes(1))
        .map(ids -> AcknowledgeRequest.create(ids))
        .to(ackSink);

subscriptionSource.alsoTo(batchAckSink).to(processSink);

Running the examples

To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.