Consumer Metadata
Metadata Client
MetadataClient
is a thin wrapper for KafkaConsumerActor
KafkaConsumerActor
hiding the ask calls and mapping to the correct response types.
To access the Kafka consumer metadata you need to create the KafkaConsumerActor
KafkaConsumerActor
as described in the Consumer documentation pass it to MetadataClient
’s factory method create
.
Another approach to create metadata client is passing the ConsumerSettings
and ActorSystem
objects to the factory method. Then the metadata client manages the internal actor and stops it when the close
method is called.
The metadata the MetadataClient
provides is documented in the Kafka Consumer API
API.
Supported metadata by MetadataClient
The supported metadata are
Metadata | Response type |
---|---|
Topics list | Future[Map[String, List[PartitionInfo]]]CompletionStage[java.util.Map[java.lang.String, java.util.List[PartitionInfo]]] |
Partitions | Future[List[PartitionInfo]]CompletionStage[java.util.List[PartitionInfo]] |
Beginning offsets | Future[Map[TopicPartition, Long]]CompletionStage[java.util.Map[TopicPartition, java.lang.Long]] |
End offsets | Future[Map[TopicPartition, Long]]CompletionStage[java.util.Map[TopicPartition, java.lang.Long]] |
Committed offsets | Future[Map[TopicPartition, OffsetAndMetadata]]CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]] |
Processing of these requests blocks the actor loop. The KafkaConsumerActor
KafkaConsumerActor
is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.
However, calling these during consuming might affect performance and even cause timeouts in extreme cases.
Please consider to use a dedicated KafkaConsumerActor
KafkaConsumerActor
to create metadata client requests against.
Example:
- Scala
-
source
import org.apache.pekko import pekko.actor.ActorRef import pekko.kafka.{ KafkaConsumerActor, Metadata } import pekko.pattern.ask import pekko.util.Timeout import org.apache.kafka.common.TopicPartition import scala.concurrent.Future import scala.concurrent.duration._ val metadataClient = MetadataClient.create(consumerSettings, 1.second) val beginningOffsets = metadataClient .getBeginningOffsets(Set(partition0)) .futureValue metadataClient.close()
- Java
-
source
import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.kafka.ConsumerSettings; import org.apache.pekko.kafka.javadsl.MetadataClient; import org.apache.pekko.kafka.testkit.TestcontainersKafkaJunit4Test; import org.apache.pekko.testkit.javadsl.TestKit; import org.apache.pekko.util.Timeout; final TopicPartition partition = new TopicPartition(topic1, 0); final ConsumerSettings<String, String> consumerSettings = consumerDefaults().withGroupId(group1); final Set<TopicPartition> partitions = Collections.singleton(partition); final MetadataClient metadataClient = MetadataClient.create(consumerSettings, timeout, sys, executor); final CompletionStage<Map<TopicPartition, Long>> response = metadataClient.getBeginningOffsets(partitions); final Map<TopicPartition, Long> beginningOffsets = response.toCompletableFuture().join(); metadataClient.close();
Accessing metadata using KafkaConsumerActor
To access the Kafka consumer metadata you need to create the KafkaConsumerActor
KafkaConsumerActor
as described in the Consumer documentation and send messages from Metadata
Metadata
to it.
Supported metadata by KafkaConsumerActor
The supported metadata are
Request | Reply |
---|---|
ListTopics | Topics |
GetPartitionsFor | PartitionsFor |
GetBeginningOffsets | BeginningOffsets |
GetEndOffsets | EndOffsets |
GetOffsetsForTimes | OffsetsForTimes |
GetCommittedOffsets | CommittedOffsets |
These requests are blocking within the Kafka client library up to a timeout configured by metadata-request-timeout
or ConsumerSettings.withMetadataRequestTimeout
respectively.
Accessing the Kafka consumer metadata using the KafkaConsumerActor
is not a recommended approach. It is reasonable only when you need to perform a request GetOffsetsForTimes
which is not supported by the MetadataClient
yet.
Example:
- Scala
-
source
import org.apache.pekko import pekko.actor.ActorRef import pekko.kafka.{ KafkaConsumerActor, Metadata } import pekko.pattern.ask import pekko.util.Timeout import org.apache.kafka.common.TopicPartition import scala.concurrent.Future import scala.concurrent.duration._ val timeout: FiniteDuration = 5.seconds val settings = consumerSettings.withMetadataRequestTimeout(timeout) implicit val askTimeout: Timeout = Timeout(timeout) val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(settings)) val topicsFuture: Future[Metadata.Topics] = (consumer ? Metadata.ListTopics).mapTo[Metadata.Topics] topicsFuture.map(_.response.foreach { map => println("Found topics:") map.foreach { case (topic, partitionInfo) => partitionInfo.foreach { info => println(s" $topic: $info") } } })
- Java
-
source
import org.apache.pekko.actor.ActorRef; import org.apache.pekko.kafka.ConsumerSettings; import org.apache.pekko.kafka.KafkaConsumerActor; import org.apache.pekko.kafka.Metadata; import org.apache.pekko.kafka.testkit.TestcontainersKafkaJunit4Test; import org.apache.pekko.kafka.tests.javadsl.LogCapturingJunit4; import org.apache.pekko.pattern.Patterns; import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import org.apache.kafka.common.PartitionInfo; Duration timeout = Duration.ofSeconds(2); ConsumerSettings<String, String> settings = consumerSettings.withMetadataRequestTimeout(timeout); ActorRef consumer = system().actorOf((KafkaConsumerActor.props(settings))); CompletionStage<Metadata.Topics> topicsStage = Patterns.ask(consumer, Metadata.createListTopics(), timeout) .thenApply(reply -> ((Metadata.Topics) reply)); // convert response CompletionStage<Optional<List<String>>> response = topicsStage .thenApply(Metadata.Topics::getResponse) .thenApply( responseOptional -> responseOptional.map( map -> map.entrySet().stream() .flatMap( entry -> { String topic = entry.getKey(); List<PartitionInfo> partitionInfos = entry.getValue(); return partitionInfos.stream() .map(info -> topic + ": " + info.toString()); }) .collect(Collectors.toList())));