Consumer Metadata

Metadata Client

MetadataClient is a thin wrapper for KafkaConsumerActorKafkaConsumerActor hiding the ask calls and mapping to the correct response types.

To access the Kafka consumer metadata you need to create the KafkaConsumerActorKafkaConsumerActor as described in the Consumer documentation pass it to MetadataClient’s factory method create.

Another approach to create metadata client is passing the ConsumerSettings and ActorSystem objects to the factory method. Then the metadata client manages the internal actor and stops it when the close method is called.

The metadata the MetadataClient provides is documented in the Kafka Consumer API API.

Supported metadata by MetadataClient

The supported metadata are

Metadata Response type
Topics list Future[Map[String, List[PartitionInfo]]]CompletionStage[java.util.Map[java.lang.String, java.util.List[PartitionInfo]]]
Partitions Future[List[PartitionInfo]]CompletionStage[java.util.List[PartitionInfo]]
Beginning offsets Future[Map[TopicPartition, Long]]CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]
End offsets Future[Map[TopicPartition, Long]]CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]
Committed offsets Future[Map[TopicPartition, OffsetAndMetadata]]CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]]
Warning

Processing of these requests blocks the actor loop. The KafkaConsumerActorKafkaConsumerActor is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.

However, calling these during consuming might affect performance and even cause timeouts in extreme cases.

Please consider to use a dedicated KafkaConsumerActorKafkaConsumerActor to create metadata client requests against.

Example:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorRef
import pekko.kafka.{ KafkaConsumerActor, Metadata }
import pekko.pattern.ask
import pekko.util.Timeout
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.concurrent.duration._

val metadataClient = MetadataClient.create(consumerSettings, 1.second)

val beginningOffsets = metadataClient
  .getBeginningOffsets(Set(partition0))
  .futureValue
metadataClient.close()
Java
sourceimport org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.javadsl.MetadataClient;
import org.apache.pekko.kafka.testkit.TestcontainersKafkaJunit4Test;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.apache.pekko.util.Timeout;
final TopicPartition partition = new TopicPartition(topic1, 0);
final ConsumerSettings<String, String> consumerSettings =
    consumerDefaults().withGroupId(group1);
final Set<TopicPartition> partitions = Collections.singleton(partition);
final MetadataClient metadataClient =
    MetadataClient.create(consumerSettings, timeout, sys, executor);

final CompletionStage<Map<TopicPartition, Long>> response =
    metadataClient.getBeginningOffsets(partitions);
final Map<TopicPartition, Long> beginningOffsets = response.toCompletableFuture().join();
metadataClient.close();

Accessing metadata using KafkaConsumerActor

To access the Kafka consumer metadata you need to create the KafkaConsumerActorKafkaConsumerActor as described in the Consumer documentation and send messages from MetadataMetadata to it.

Supported metadata by KafkaConsumerActor

The supported metadata are

Request Reply
ListTopics Topics
GetPartitionsFor PartitionsFor
GetBeginningOffsets BeginningOffsets
GetEndOffsets EndOffsets
GetOffsetsForTimes OffsetsForTimes
GetCommittedOffsets CommittedOffsets

These requests are blocking within the Kafka client library up to a timeout configured by metadata-request-timeout or ConsumerSettings.withMetadataRequestTimeout respectively.

Warning

Accessing the Kafka consumer metadata using the KafkaConsumerActor is not a recommended approach. It is reasonable only when you need to perform a request GetOffsetsForTimes which is not supported by the MetadataClient yet.

Example:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorRef
import pekko.kafka.{ KafkaConsumerActor, Metadata }
import pekko.pattern.ask
import pekko.util.Timeout
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.concurrent.duration._

val timeout: FiniteDuration = 5.seconds
val settings = consumerSettings.withMetadataRequestTimeout(timeout)
implicit val askTimeout: Timeout = Timeout(timeout)

val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(settings))

val topicsFuture: Future[Metadata.Topics] = (consumer ? Metadata.ListTopics).mapTo[Metadata.Topics]

topicsFuture.map(_.response.foreach { map =>
  println("Found topics:")
  map.foreach {
    case (topic, partitionInfo) =>
      partitionInfo.foreach { info =>
        println(s"  $topic: $info")
      }
  }
})
Java
sourceimport org.apache.pekko.actor.ActorRef;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.KafkaConsumerActor;
import org.apache.pekko.kafka.Metadata;
import org.apache.pekko.kafka.testkit.TestcontainersKafkaJunit4Test;
import org.apache.pekko.kafka.tests.javadsl.LogCapturingJunit4;
import org.apache.pekko.pattern.Patterns;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.kafka.common.PartitionInfo;

Duration timeout = Duration.ofSeconds(2);
ConsumerSettings<String, String> settings =
    consumerSettings.withMetadataRequestTimeout(timeout);

ActorRef consumer = system().actorOf((KafkaConsumerActor.props(settings)));

CompletionStage<Metadata.Topics> topicsStage =
    Patterns.ask(consumer, Metadata.createListTopics(), timeout)
        .thenApply(reply -> ((Metadata.Topics) reply));

// convert response
CompletionStage<Optional<List<String>>> response =
    topicsStage
        .thenApply(Metadata.Topics::getResponse)
        .thenApply(
            responseOptional ->
                responseOptional.map(
                    map ->
                        map.entrySet().stream()
                            .flatMap(
                                entry -> {
                                  String topic = entry.getKey();
                                  List<PartitionInfo> partitionInfos = entry.getValue();
                                  return partitionInfos.stream()
                                      .map(info -> topic + ": " + info.toString());
                                })
                            .collect(Collectors.toList())));