Subscription
Consumer Sources are created with different types of subscriptions, which control from which topics, partitions and offsets data is to be consumed.
Subscriptions are grouped into two categories: with automatic partition assignment and with manual control of partition assignment.
Factory methods for all subscriptions can be found in the Subscriptions
Subscriptions
factory.
Automatic Partition Assignment
Topic
Subscribes to one or more topics. Partitions will be assigned automatically by the Kafka Client.
- Scala
-
source
val subscription = Subscriptions.topics(topic) val consumer = Consumer.plainSource(consumerDefaults.withGroupId(group), subscription)
- Java
-
source
final AutoSubscription subscription = Subscriptions.topics(topic); final Source<ConsumerRecord<String, String>, Consumer.Control> consumer = Consumer.plainSource(consumerDefaults().withGroupId(group), subscription);
Topic Pattern
Subscribes to one or more topics which match the given pattern. Take a look at the subscribe(java.util.regex.Pattern pattern,
...)
method documentation for more information on topic pattern matching.
- Scala
-
source
val pattern = s"topic-$suffix-[0-9]+" val subscription = Subscriptions.topicPattern(pattern) val consumer = Consumer.plainSource(consumerDefaults.withGroupId(group), subscription)
- Java
-
source
final String pattern = "topic-900[1|2]-[0-9]+"; final AutoSubscription subscription = Subscriptions.topicPattern(pattern); final Source<ConsumerRecord<String, String>, Consumer.Control> consumer = Consumer.plainSource(consumerDefaults().withGroupId(group), subscription);
Manual Partition Assignment
Partition Assignment
Subscribes to given topics and their given partitions.
- Scala
-
source
val partition = 0 val subscription = Subscriptions.assignment(new TopicPartition(topic, partition)) val consumer = Consumer.plainSource(consumerDefaults, subscription)
- Java
-
source
final Integer partition = 0; final ManualSubscription subscription = Subscriptions.assignment(new TopicPartition(topic, partition)); final Source<ConsumerRecord<String, String>, Consumer.Control> consumer = Consumer.plainSource(consumerDefaults(), subscription);
Partition Assignment with Offset
Subscribes to given topics and their partitions allowing to also set an offset from which messages will be read.
- Scala
-
source
val partition = 0 val offset: Long = totalMessages.toLong / 2 val subscription = Subscriptions.assignmentWithOffset(new TopicPartition(topic, partition) -> offset) val consumer = Consumer.plainSource(consumerDefaults, subscription)
- Java
-
source
final Integer partition = 0; final long offset = totalMessages / 2; final ManualSubscription subscription = Subscriptions.assignmentWithOffset(new TopicPartition(topic, partition), offset); final Source<ConsumerRecord<String, String>, Consumer.Control> consumer = Consumer.plainSource(consumerDefaults(), subscription);
This subscription can be used when offsets are stored in Kafka or on external storage. For more information, take a look at the Offset Storage external to Kafka documentation page.
Partition Assignment with Timestamp
Subscribes to given topics and their partitions allowing to also set a timestamp which will be used to find the offset from which messages will be read.
- Scala
-
source
val partition = 0 val now = System.currentTimeMillis val messagesSince: Long = now - 5000 val subscription = Subscriptions.assignmentOffsetsForTimes(new TopicPartition(topic, partition) -> messagesSince) val consumer = Consumer.plainSource(consumerDefaults, subscription)
- Java
-
source
final Integer partition = 0; final Long now = System.currentTimeMillis(); final Long messagesSince = now - 5000; final ManualSubscription subscription = Subscriptions.assignmentOffsetsForTimes( new TopicPartition(topic, partition), messagesSince); final Source<ConsumerRecord<String, String>, Consumer.Control> consumer = Consumer.plainSource(consumerDefaults(), subscription);