Google Cloud Pub/Sub
Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.
Further information at the official Google Cloud documentation website.
This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com
). For a connector that uses gRPC for the communication, take a look at the alternative Apache Pekko Connectors Google Cloud Pub/Sub gRPC connector.
Project Info: Apache Pekko Connectors Google Cloud PubSub | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-google-cloud-pub-sub
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.google.cloud.pubsub |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
val PekkoHttpVersion = "1.1.0"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-google-cloud-pub-sub" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<pekko.http.version>1.1.0</pekko.http.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-google-cloud-pub-sub_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http_${scala.binary.version}</artifactId>
<version>${pekko.http.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http-spray-json_${scala.binary.version}</artifactId>
<version>${pekko.http.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
PekkoHttpVersion: "1.1.0",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-google-cloud-pub-sub_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
implementation "org.apache.pekko:pekko-http-spray-json_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Usage¶
The Pub/Sub connector shares its basic configuration with all the Google connectors in Apache Pekko Connectors. Additional Pub/Sub-specific configuration settings can be found in its own reference.conf.
And prepare the actor system.
sourceimplicit val system: ActorSystem = ActorSystem()
val config = PubSubConfig()
val topic = "topic1"
val subscription = "subscription1"
sourceActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String topic = "topic1";
String subscription = "subscription1";
To publish a single request, build the message with a base64 data payload and put it in a PublishRequest
. Publishing creates a flow taking the messages and returning the accepted message ids.
sourceval publishMessage =
PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))
val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config)
val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
sourcePublishMessage publishMessage =
PublishMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.create(Lists.newArrayList(publishMessage));
Source<PublishRequest, NotUsed> source = Source.single(publishRequest);
Flow<PublishRequest, List<String>, NotUsed> publishFlow =
GooglePubSub.publish(topic, config, 1);
CompletionStage<List<List<String>>> publishedMessageIds =
source.via(publishFlow).runWith(Sink.seq(), system);
To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.
sourceval messageSource: Source[PublishMessage, NotUsed] = Source(List(publishMessage, publishMessage))
messageSource
.groupedWithin(1000, 1.minute)
.map(grouped => PublishRequest(grouped))
.via(publishFlow)
.to(Sink.seq)
sourceSource<PublishMessage, NotUsed> messageSource = Source.single(publishMessage);
messageSource
.groupedWithin(1000, Duration.ofMinutes(1))
.map(messages -> PublishRequest.create(messages))
.via(publishFlow)
.runWith(Sink.ignore(), system);
To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest
sourceval subscriptionSource: Source[ReceivedMessage, Cancellable] =
GooglePubSub.subscribe(subscription, config)
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(subscription, config)
subscriptionSource
.map { message =>
// do something fun
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
sourceSource<ReceivedMessage, Cancellable> subscriptionSource =
GooglePubSub.subscribe(subscription, config);
Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
GooglePubSub.acknowledge(subscription, config);
subscriptionSource
.map(
message -> {
// do something fun
return message.ackId();
})
.groupedWithin(1000, Duration.ofMinutes(1))
.map(acks -> AcknowledgeRequest.create(acks))
.to(ackSink);
If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.
sourceval subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = // ???
val processMessage: Sink[ReceivedMessage, NotUsed] = // ???
val batchAckSink =
Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)
val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
sourceSink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink;
Sink<ReceivedMessage, NotUsed> batchAckSink =
Flow.of(ReceivedMessage.class)
.map(t -> t.ackId())
.groupedWithin(1000, Duration.ofMinutes(1))
.map(ids -> AcknowledgeRequest.create(ids))
.to(ackSink);
subscriptionSource.alsoTo(batchAckSink).to(processSink);
Running the examples¶
To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.