Apache Pekko Cluster Sharding
Apache Pekko Cluster allows the user to use an external shard allocation strategy in order to give the user more control over how many shards are created and what cluster nodes they are assigned to. If you consume Kafka messages into your Apache Pekko Cluster application then it’s possible to run an Apache Pekko Connectors Kafka Consumer on each cluster node and co-locate Kafka partitions with Apache Pekko Cluster shards. When partitions and shards are co-located together then there is less chance that a message must be transmitted over the network by the Apache Pekko Cluster Shard Coordinator to a destination user sharded entity.
Project Info¶
Project Info: Apache Pekko Connectors Kafka Cluster Sharding | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-kafka-cluster-sharding
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 |
Scala versions | 2.12.20, 2.13.15, 3.3.4 |
JPMS module name | pekko.stream.connectors.kafka.cluster.sharding |
License | |
Home page | https://pekko.apache.org/docs/pekko-connectors-kafka/current/ |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors-kafka |
Dependency¶
<properties>
<pekko.version>1.1.1</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-kafka-cluster-sharding_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster-sharding-typed_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
val PekkoVersion = "1.1.1"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-kafka-cluster-sharding" % "1.1.0",
"org.apache.pekko" %% "pekko-cluster-sharding-typed" % PekkoVersion
)
def versions = [
PekkoVersion: "1.1.1",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-kafka-cluster-sharding_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-cluster-sharding-typed_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
This module contains an Apache Pekko extension called KafkaClusterSharding
and depends on pekko-cluster-sharding-typed
.
Setup¶
There are two steps required to setup the cluster sharding module.
- Initialize Apache Pekko Cluster Sharding with a
ShardingMessageExtractor
to route Kafka consumed messages to the correct Apache Pekko Cluster shard and user entity. - Use a provided Rebalance Listener in your
ConsumerSettings
to update the external shard allocation at runtime when Kafka Consumer Group rebalances occur.
A complete example of using this module exists in an apache/pekko-samples
project called pekko-sample-kafka-to-sharding-scala
.
It’s a self-contained example that can run on a developer’s laptop.
Sharding Message Extractors¶
To setup the ShardingMessageExtractor
pick a factory method in the KafkaClusterSharding
Apache Pekko extension that best fits your use case. This module provides two kinds of extractors, extractors for entities that are within a ShardingEnvelope
and without.
They’re called messageExtractor
and messageExtractorNoEnvelope
respectively.
To route Kafka messages to the correct user entity we must use the same algorithm used to define the Kafka partition for the consumed message. This module implements the Murmur2-based hashing algorithm that’s used in the Kafka DefaultPartitioner
that’s used by default in the Kafka Producer. The input to this algorithm is the entity key and the number of partitions used in the topic the message was consumed from. Therefore it’s critical to use the same Kafka message key (sharded entity id) and number of Kafka topic partitions (shards). The message extractors can optionally look up the number of shards given a topic name, or the user can provide the number of shards explicitly.
To get the ShardingMessageExtractor
call the messageExtractor
overload that’s suitable for your use case.
In the following example we asynchronously request an extractor that does not use a sharding envelope and will use the same number of partitions as the given topic name.
Given a user entity.
sourcefinal case class User(id: String, name: String)
sourcestatic final class User {
public final String id;
public final String mame;
User(String id, String mame) {
this.id = id;
this.mame = mame;
}
}
Create a MessageExtractor
.
source// automatically retrieving the number of partitions requires a round trip to a Kafka broker
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[User]] =
KafkaClusterSharding(system.toClassic).messageExtractorNoEnvelope(
timeout = 10.seconds,
topic = "user-topic",
entityIdExtractor = (msg: User) => msg.id,
settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaBootstrapServers))
source// automatically retrieving the number of partitions requires a round trip to a Kafka broker
CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<User>> messageExtractor =
KafkaClusterSharding.get(system)
.messageExtractorNoEnvelope(
"user-topic",
Duration.ofSeconds(10),
(User msg) -> msg.id,
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer()));
Setup Apache Pekko Typed Cluster Sharding.
source// create an Apache Pekko Cluster Sharding `EntityTypeKey` for `User` for this Kafka Consumer Group
val groupId = "user-topic-group-id"
val typeKey = EntityTypeKey[User](groupId)
messageExtractor.onComplete {
case Success(extractor) =>
ClusterSharding(system).init(
Entity(typeKey)(createBehavior = _ => userBehaviour())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
.withMessageExtractor(extractor)
.withSettings(ClusterShardingSettings(system)))
case Failure(ex) => system.log.error("An error occurred while obtaining the message extractor", ex)
}
source
String groupId = "user-topic-group-id";
EntityTypeKey<User> typeKey = EntityTypeKey.create(User.class, groupId);
messageExtractor.thenAccept(
extractor ->
ClusterSharding.get(system)
.init(
Entity.of(typeKey, ctx -> userBehaviour())
.withAllocationStrategy(
new ExternalShardAllocationStrategy(
system, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))
.withMessageExtractor(extractor)));
Rebalance Listener¶
The Rebalance Listener is a pre-defined Actor that will handle ConsumerRebalanceEvents
that will update the Apache Pekko Cluster External Sharding strategy when subscribed partitions are re-assigned to consumers running on different cluster nodes. This makes sure that shards remain local to Kafka Consumers after a consumer group rebalance. The Rebalance Listener is returned as a Typed ActorRef[ConsumerRebalanceEvent]
and must be converted to a classic ActorRef
before being passed to ConsumerSettings
.
It’s recommended to use the same value for both the Kafka Consumer Group ID and the EntityTypeKey
. This allows you to create multiple Kafka Consumer Groups that consume the same type of messages from the same topic, but are routed to different Behaviors
to be processed in a different way.
For example, a user-events
topic is consumed by two consumer groups. One consumer group is used to maintain an up-to-date view of the user’s profile and the other is used to represent an aggregate history of the types of user events. The same message type is used by separate Apache Pekko Connectors Kafka consumers, but the messages are routed to different Apache Pekko Cluster Sharding Coordinators that are setup to use separate Behaviors
.
Create the rebalance listener using the extension and pass it into an Apache Pekko Connectors Kafka Subscription
.
source// obtain an Apache Pekko classic ActorRef that will handle consumer group rebalance events
val rebalanceListener: pekko.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system.toClassic).rebalanceListener(typeKey)
// convert the rebalance listener to a classic ActorRef until Apache Pekko Connector Kafka supports Apache Pekko Typed
import pekko.actor.typed.scaladsl.adapter._
val rebalanceListenerClassic: pekko.actor.ActorRef = rebalanceListener.toClassic
val consumerSettings =
ConsumerSettings(system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(typeKey.name) // use the same group id as we used in the `EntityTypeKey` for `User`
// pass the rebalance listener to the topic subscription
val subscription = Subscriptions
.topics("user-topic")
.withRebalanceListener(rebalanceListenerClassic)
// run & materialize the stream
val consumer = Consumer
.plainSource(consumerSettings, subscription)
.via(userBusiness())
.runWith(Sink.ignore)
sourceorg.apache.pekko.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener =
KafkaClusterSharding.get(system).rebalanceListener(typeKey);
ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(
typeKey
.name()); // use the same group id as we used in the `EntityTypeKey` for `User`
// pass the rebalance listener to the topic subscription
AutoSubscription subscription =
Subscriptions.topics("user-topic")
.withRebalanceListener(Adapter.toClassic(rebalanceListener));
// run & materialize the stream
Consumer.plainSource(consumerSettings, subscription)
.via(userBusiness())
.runWith(Sink.ignore(), system);