React on Partition Assignment

Apache Pekko Connectors Kafka allows to react to the Kafka broker’s balancing of partitions within a consumer group in two ways:

  1. callbacks to the PartitionAssignmentHandlerPartitionAssignmentHandler
  2. messages to a rebalance listener actor

Partition Assignment Handler

Kafka balances partitions between all consumers within a consumer group. When new consumers join or leave the group partitions are revoked from and assigned to those consumers.

API may change

This PartitionAssignmentHandlerPartitionAssignmentHandler API was introduced in Apache Pekko Connectors Kafka 2.0.0 and may still be subject to change.

Please give input on its usefulness in Issue #985.

Apache Pekko Connectors Kafka’s PartitionAssignmentHandlerPartitionAssignmentHandler expects callbacks to be implemented, all are called with a set of TopicPartitions and a reference to the RestrictedConsumerRestrictedConsumer which allows some access to the Kafka Consumer instance used internally by Apache Pekko Connectors Kafka.

  1. onRevoke is called when the Kafka broker revokes partitions from this consumer
  2. onAssign is called when the Kafka broker assigns partitions to this consumer
  3. onLost is called when partition metadata has changed and partitions no longer exist. This can occur if a topic is deleted or if the leader’s metadata is stale. For details see KIP-429 Incremental Rebalance Protocol.
  4. onStop is called when the Apache Pekko Connectors Kafka consumer source is about to stop

Rebalancing starts with revoking partitions from all consumers in a consumer group and assigning all partitions to consumers in a second phase. During rebalance no consumer within that consumer group receives any messages.

The PartitionAssignmentHandlerPartitionAssignmentHandler is Apache Pekko Connectors Kafka’s replacement of the Kafka client library’s ConsumerRebalanceListener.

Warning

All methods on the PartitionAssignmentHandlerPartitionAssignmentHandler are called synchronously during Kafka’s poll and rebalance logic. They block any other activity for that consumer.

If any of these methods take longer than the timeout configured in pekko.kafka.consumer.partition-handler-warning (default 5 seconds) a warning will be logged.

This example shows an implementation of the PartitionAssignmentHandler and how it is passed to the consumer via the Subscription.

Scala
sourceval assignmentHandler = new PartitionAssignmentHandler {
  override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ???
  override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ???
  override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ???
  override def onStop(currentTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = // ???
}

val subscription = Subscriptions
  .topics(topic)
  .withPartitionAssignmentHandler(assignmentHandler)
Java
sourcePartitionAssignmentHandler assignmentHandler =
    new PartitionAssignmentHandler() {
      public void onRevoke(Set<TopicPartition> revokedTps, RestrictedConsumer consumer) {
      }

      public void onAssign(Set<TopicPartition> assignedTps, RestrictedConsumer consumer) {
      }

      public void onLost(Set<TopicPartition> lostTps, RestrictedConsumer consumer) {}

      public void onStop(Set<TopicPartition> currentTps, RestrictedConsumer consumer) {
      }
    };

Subscription subscription =
    Subscriptions.topics(topic).withPartitionAssignmentHandler(assignmentHandler);

Listening for rebalance events

You may set up an rebalance event listener actor that will be notified when your consumer will be assigned or revoked from consuming from specific topic partitions. Two kinds of messages will be sent to this listener actor:

You can use a typed ActorRefActorRef to implement your rebalance event listener by converting it into a classic actor ref. See the example below and read the Coexistence page of the Apache Pekko Documentation for more details on Apache Pekko Classic and Typed interoperability.

Scala
sourceimport org.apache.pekko
import pekko.kafka.{TopicPartitionsAssigned, TopicPartitionsRevoked}

def rebalanceListener(): Behavior[ConsumerRebalanceEvent] = Behaviors.receive {
  case (context, TopicPartitionsAssigned(subscription, topicPartitions)) =>
    context.log.info("Assigned: {}", topicPartitions)
    Behaviors.same

  case (context, TopicPartitionsRevoked(subscription, topicPartitions)) =>
    context.log.info("Revoked: {}", topicPartitions)
    Behaviors.same
}

val typedRef: pekko.actor.typed.ActorRef[ConsumerRebalanceEvent] =
  context.spawn(rebalanceListener(), "rebalance-listener")

// adds support for actors to a classic actor system and context
import pekko.actor.typed.scaladsl.adapter._
  
val classicRef: pekko.actor.ActorRef = typedRef.toClassic

val subscription = Subscriptions
  .topics(topic)
  // additionally, pass the actor reference:
  .withRebalanceListener(classicRef)

// use the subscription as usual:
Consumer
  .plainSource(consumerSettings, subscription)
Java
source// adds support for actors to a classic actor system and context
import org.apache.pekko.actor.typed.javadsl.Adapter;

    Function<ActorContext<ConsumerRebalanceEvent>, Behavior<ConsumerRebalanceEvent>>
        rebalanceListener =
            (ActorContext<ConsumerRebalanceEvent> context) ->
                Behaviors.receive(ConsumerRebalanceEvent.class)
                    .onMessage(
                        TopicPartitionsAssigned.class,
                        assigned -> {
                          context.getLog().info("Assigned: {}", assigned);
                          return Behaviors.same();
                        })
                    .onMessage(
                        TopicPartitionsRevoked.class,
                        revoked -> {
                          context.getLog().info("Revoked: {}", revoked);
                          return Behaviors.same();
                        })
                    .build();

              Behavior<ConsumerRebalanceEvent> listener =
                  Behaviors.setup(ctx -> rebalanceListener.apply(ctx));

              org.apache.pekko.actor.typed.ActorRef<ConsumerRebalanceEvent> typedRef =
                  guardianCtx.spawn(listener, "rebalance-listener");

              org.apache.pekko.actor.ActorRef classicRef = Adapter.toClassic(typedRef);

              Subscription subscription =
                  Subscriptions.topics(topic)
                      // additionally, pass the actor reference:
                      .withRebalanceListener(classicRef);

              Consumer.DrainingControl<List<ConsumerRecord<String, String>>> control =
                  // use the subscription as usual:
                  Consumer.plainSource(consumerSettings, subscription)
                      .toMat(Sink.seq(), Consumer::createDrainingControl)
                      .run(system);

Or with Classic Actors

Scala
sourceimport org.apache.pekko
import pekko.kafka.{TopicPartitionsAssigned, TopicPartitionsRevoked}

class RebalanceListener extends Actor with ActorLogging {
  def receive: Receive = {
    case TopicPartitionsAssigned(subscription, topicPartitions) =>
      log.info("Assigned: {}", topicPartitions)

    case TopicPartitionsRevoked(subscription, topicPartitions) =>
      log.info("Revoked: {}", topicPartitions)
  }
}

val rebalanceListener = system.actorOf(Props(new RebalanceListener))
val subscription = Subscriptions
  .topics(topic)
  // additionally, pass the actor reference:
  .withRebalanceListener(rebalanceListener)

// use the subscription as usual:
  Consumer
    .plainSource(consumerSettings, subscription)
Java
sourcestatic class RebalanceListener extends AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            TopicPartitionsAssigned.class,
            assigned -> {
              log().info("Assigned: {}", assigned);
            })
        .match(
            TopicPartitionsRevoked.class,
            revoked -> {
              log().info("Revoked: {}", revoked);
            })
        .build();
  }
}

  ActorRef rebalanceListener = system.actorOf(Props.create(RebalanceListener.class));

  Subscription subscription =
      Subscriptions.topics(topic)
          // additionally, pass the actor reference:
          .withRebalanceListener(rebalanceListener);

  // use the subscription as usual:
  Consumer.DrainingControl<List<ConsumerRecord<String, String>>> control =
      Consumer.plainSource(consumerSettings, subscription)
          .toMat(Sink.seq(), Consumer::createDrainingControl)
          .run(system);