React on Partition Assignment
Apache Pekko Connectors Kafka allows to react to the Kafka broker’s balancing of partitions within a consumer group in two ways:
- callbacks to the
PartitionAssignmentHandler
PartitionAssignmentHandler
- messages to a rebalance listener actor
Partition Assignment Handler
Kafka balances partitions between all consumers within a consumer group. When new consumers join or leave the group partitions are revoked from and assigned to those consumers.
This PartitionAssignmentHandler
PartitionAssignmentHandler
API was introduced in Apache Pekko Connectors Kafka 2.0.0 and may still be subject to change.
Please give input on its usefulness in Issue #985.
Apache Pekko Connectors Kafka’s PartitionAssignmentHandler
PartitionAssignmentHandler
expects callbacks to be implemented, all are called with a set of TopicPartition
s and a reference to the RestrictedConsumer
RestrictedConsumer
which allows some access to the Kafka Consumer
instance used internally by Apache Pekko Connectors Kafka.
onRevoke
is called when the Kafka broker revokes partitions from this consumeronAssign
is called when the Kafka broker assigns partitions to this consumeronLost
is called when partition metadata has changed and partitions no longer exist. This can occur if a topic is deleted or if the leader’s metadata is stale. For details see KIP-429 Incremental Rebalance Protocol.onStop
is called when the Apache Pekko Connectors Kafka consumer source is about to stop
Rebalancing starts with revoking partitions from all consumers in a consumer group and assigning all partitions to consumers in a second phase. During rebalance no consumer within that consumer group receives any messages.
The PartitionAssignmentHandler
PartitionAssignmentHandler
is Apache Pekko Connectors Kafka’s replacement of the Kafka client library’s ConsumerRebalanceListener
.
All methods on the PartitionAssignmentHandler
PartitionAssignmentHandler
are called synchronously during Kafka’s poll and rebalance logic. They block any other activity for that consumer.
If any of these methods take longer than the timeout configured in pekko.kafka.consumer.partition-handler-warning
(default 5 seconds) a warning will be logged.
This example shows an implementation of the PartitionAssignmentHandler
and how it is passed to the consumer via the Subscription
.
- Scala
-
source
val assignmentHandler = new PartitionAssignmentHandler { override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ??? override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ??? override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ??? override def onStop(currentTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ??? } val subscription = Subscriptions .topics(topic) .withPartitionAssignmentHandler(assignmentHandler)
- Java
-
source
PartitionAssignmentHandler assignmentHandler = new PartitionAssignmentHandler() { public void onRevoke(Set<TopicPartition> revokedTps, RestrictedConsumer consumer) { } public void onAssign(Set<TopicPartition> assignedTps, RestrictedConsumer consumer) { } public void onLost(Set<TopicPartition> lostTps, RestrictedConsumer consumer) {} public void onStop(Set<TopicPartition> currentTps, RestrictedConsumer consumer) { } }; Subscription subscription = Subscriptions.topics(topic).withPartitionAssignmentHandler(assignmentHandler);
Listening for rebalance events
You may set up an rebalance event listener actor that will be notified when your consumer will be assigned or revoked from consuming from specific topic partitions. Two kinds of messages will be sent to this listener actor:
You can use a typed ActorRef
ActorRef
to implement your rebalance event listener by converting it into a classic actor ref. See the example below and read the Coexistence page of the Apache Pekko Documentation for more details on Apache Pekko Classic and Typed interoperability.
- Scala
-
source
import org.apache.pekko import pekko.kafka.{TopicPartitionsAssigned, TopicPartitionsRevoked} def rebalanceListener(): Behavior[ConsumerRebalanceEvent] = Behaviors.receive { case (context, TopicPartitionsAssigned(subscription, topicPartitions)) => context.log.info("Assigned: {}", topicPartitions) Behaviors.same case (context, TopicPartitionsRevoked(subscription, topicPartitions)) => context.log.info("Revoked: {}", topicPartitions) Behaviors.same } val typedRef: pekko.actor.typed.ActorRef[ConsumerRebalanceEvent] = context.spawn(rebalanceListener(), "rebalance-listener") // adds support for actors to a classic actor system and context import pekko.actor.typed.scaladsl.adapter._ val classicRef: pekko.actor.ActorRef = typedRef.toClassic val subscription = Subscriptions .topics(topic) // additionally, pass the actor reference: .withRebalanceListener(classicRef) // use the subscription as usual: Consumer .plainSource(consumerSettings, subscription)
- Java
-
source
// adds support for actors to a classic actor system and context import org.apache.pekko.actor.typed.javadsl.Adapter; Function<ActorContext<ConsumerRebalanceEvent>, Behavior<ConsumerRebalanceEvent>> rebalanceListener = (ActorContext<ConsumerRebalanceEvent> context) -> Behaviors.receive(ConsumerRebalanceEvent.class) .onMessage( TopicPartitionsAssigned.class, assigned -> { context.getLog().info("Assigned: {}", assigned); return Behaviors.same(); }) .onMessage( TopicPartitionsRevoked.class, revoked -> { context.getLog().info("Revoked: {}", revoked); return Behaviors.same(); }) .build(); Behavior<ConsumerRebalanceEvent> listener = Behaviors.setup(ctx -> rebalanceListener.apply(ctx)); org.apache.pekko.actor.typed.ActorRef<ConsumerRebalanceEvent> typedRef = guardianCtx.spawn(listener, "rebalance-listener"); org.apache.pekko.actor.ActorRef classicRef = Adapter.toClassic(typedRef); Subscription subscription = Subscriptions.topics(topic) // additionally, pass the actor reference: .withRebalanceListener(classicRef); Consumer.DrainingControl<List<ConsumerRecord<String, String>>> control = // use the subscription as usual: Consumer.plainSource(consumerSettings, subscription) .toMat(Sink.seq(), Consumer::createDrainingControl) .run(system);
Or with Classic Actors
- Scala
-
source
import org.apache.pekko import pekko.kafka.{TopicPartitionsAssigned, TopicPartitionsRevoked} class RebalanceListener extends Actor with ActorLogging { def receive: Receive = { case TopicPartitionsAssigned(subscription, topicPartitions) => log.info("Assigned: {}", topicPartitions) case TopicPartitionsRevoked(subscription, topicPartitions) => log.info("Revoked: {}", topicPartitions) } } val rebalanceListener = system.actorOf(Props(new RebalanceListener)) val subscription = Subscriptions .topics(topic) // additionally, pass the actor reference: .withRebalanceListener(rebalanceListener) // use the subscription as usual: Consumer .plainSource(consumerSettings, subscription)
- Java
-
source
static class RebalanceListener extends AbstractLoggingActor { @Override public Receive createReceive() { return receiveBuilder() .match( TopicPartitionsAssigned.class, assigned -> { log().info("Assigned: {}", assigned); }) .match( TopicPartitionsRevoked.class, revoked -> { log().info("Revoked: {}", revoked); }) .build(); } } ActorRef rebalanceListener = system.actorOf(Props.create(RebalanceListener.class)); Subscription subscription = Subscriptions.topics(topic) // additionally, pass the actor reference: .withRebalanceListener(rebalanceListener); // use the subscription as usual: Consumer.DrainingControl<List<ConsumerRecord<String, String>>> control = Consumer.plainSource(consumerSettings, subscription) .toMat(Sink.seq(), Consumer::createDrainingControl) .run(system);