Consumer

A consumer subscribes to Kafka topics and passes the messages into an Apache Pekko Stream.

The underlying implementation is using the KafkaConsumer, see Kafka API for a description of consumer groups, offsets, and other details.

Choosing a consumer

Apache Pekko Connectors Kafka offers a large variety of consumers that connect to Kafka and stream data. The tables below may help you to find the consumer best suited for your use-case.

Consumers

These factory methods are part of the ConsumerConsumer API.

Offsets handling Partition aware Subscription Shared consumer Factory method Stream element type
No (auto commit can be enabled) No Topic or Partition No plainSource ConsumerRecord
No (auto commit can be enabled) No Partition Yes plainExternalSource ConsumerRecord
Explicit committing No Topic or Partition No committableSource CommittableMessage
Explicit committing No Partition Yes committableExternalSource CommittableMessage
Explicit committing with metadata No Topic or Partition No commitWithMetadataSource CommittableMessage
Explicit committing (with metadata) No Topic or Partition No sourceWithOffsetContext ConsumerRecord
Offset committed per element No Topic or Partition No atMostOnceSource ConsumerRecord
No (auto commit can be enabled) Yes Topic or Partition No plainPartitionedSource (TopicPartition, Source[ConsumerRecord, ..])
External to Kafka Yes Topic or Partition No plainPartitionedManualOffsetSource (TopicPartition, Source[ConsumerRecord, ..])
Explicit committing Yes Topic or Partition No committablePartitionedSource (TopicPartition, Source[CommittableMessage, ..])
External to Kafka & Explicit Committing Yes Topic or Partition No committablePartitionedManualOffsetSource (TopicPartition, Source[CommittableMessage, ..])
Explicit committing with metadata Yes Topic or Partition No commitWithMetadataPartitionedSource (TopicPartition, Source[CommittableMessage, ..])

Transactional consumers

These factory methods are part of the TransactionalTransactional. For details see Transactions.

Offsets handling Partition aware Shared consumer Factory method Stream element type
Transactional No No Transactional.source TransactionalMessage
Transactional No No Transactional.sourceWithOffsetContext ConsumerRecord

Settings

When creating a consumer source you need to pass in ConsumerSettingsConsumerSettings that define things like:

  • de-serializers for the keys and values
  • bootstrap servers of the Kafka cluster (see Service discovery to defer the server configuration)
  • group id for the consumer, note that offsets are always committed for a given consumer group
  • Kafka consumer tuning parameters

Apache Pekko Connectors Kafka’s defaults for all settings are defined in reference.conf which is included in the library JAR.

Important consumer settings
Setting Description
stop-timeout The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using Consumer.DrainingControlConsumer.DrainingControl
kafka-clients Section for properties passed unchanged to the Kafka client (see Kafka’s Consumer Configs)
connection-checker Configuration to let the stream fail if the connection to the Kafka broker fails.
reference.conf (HOCON)
source# Properties for pekko.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
pekko.kafka.consumer {
  # Config path of Apache Pekko Discovery method
  # "pekko.discovery" to use the Apache Pekko Discovery method configured for the ActorSystem
  discovery-method = pekko.discovery

  # Set a service name for use with Apache Pekko Discovery
  # https://pekko.apache.org/docs/pekko-connectors-kafka/current/discovery.html
  service-name = ""

  # Timeout for getting a reply from the discovery-method lookup
  resolve-timeout = 3 seconds

  # Tuning property of scheduled polls.
  # Controls the interval from one scheduled poll to the next.
  poll-interval = 50ms

  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that the thread that
  # is executing the stage will be blocked. See also the `wakup-timeout` setting below.
  poll-timeout = 50ms

  # The stage will delay stopping the internal actor to allow processing of
  # messages already in the stream (required for successful committing).
  # This can be set to 0 for streams using `DrainingControl`.
  stop-timeout = 30s

  # Duration to wait for `KafkaConsumer.close` to finish.
  close-timeout = 20s

  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `CommitTimeoutException`.
  # The `Transactional.source` waits this ammount of time for the producer to mark messages as not
  # being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
  commit-timeout = 15s

  # If commits take longer than this time a warning is logged
  commit-time-warning = 1s

  # Not relevant for Kafka after version 2.1.0.
  # If set to a finite duration, the consumer will re-send the last committed offsets periodically
  # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
  commit-refresh-interval = infinite

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  use-dispatcher = "pekko.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

  # Time to wait for pending requests when a partition is closed
  wait-close-partition = 500ms

  # Limits the query to Kafka for a topic's position
  position-timeout = 5s

  # When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
  # call to Kafka's API
  offset-for-times-timeout = 5s

  # Timeout for org.apache.pekko.kafka.Metadata requests
  # This value is used instead of Kafka's default from `default.api.timeout.ms`
  # which is 1 minute.
  metadata-request-timeout = 5s

  # Interval for checking that transaction was completed before closing the consumer.
  # Used in the transactional flow for exactly-once-semantics processing.
  eos-draining-check-interval = 30ms

  # Issue warnings when a call to a partition assignment handler method takes
  # longer than this.
  partition-handler-warning = 5s

  # Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
  # configured by `consumer.metadata-request-timeout`
  connection-checker {

    #Flag to turn on connection checker
    enable = false

    # Amount of attempts to be performed after a first connection failure occurs
    # Required, non-negative integer
    max-retries = 3

    # Interval for the connection check. Used as the base for exponential retry.
    check-interval = 15s

    # Check interval multiplier for backoff interval
    # Required, positive number
    backoff-factor = 2.0
  }

  # Protect against server-side bugs that cause Kafka to temporarily "lose" the latest offset for a consumer, which
  # then causes the Kafka consumer to follow its normal 'auto.offset.reset' behavior. For 'earliest', these settings
  # allow the client to detect and attempt to recover from this issue. For 'none' and 'latest', these settings will
  # only add overhead. See
  # for more information
  offset-reset-protection {
    # turns on reset protection
    enable = false
    # if consumer gets a record with an offset that is more than this number of offsets back from the previously
    # requested offset, it is considered a reset
    offset-threshold = 9223372036854775807
    # if the record is more than this duration earlier the last received record, it is considered a reset
    time-threshold = 100000 days
  }
}

The Kafka documentation Consumer Configs lists the settings, their defaults and importance. More detailed explanations are given in the KafkaConsumer API and constants are defined in ConsumerConfig API.

Programmatic construction

Stream-specific settings like the de-serializers and consumer group ID should be set programmatically. Settings that apply to many consumers may be set in application.conf or use config inheritance.

Scala
sourceval config = system.settings.config.getConfig("pekko.kafka.consumer")
val consumerSettings =
  ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Java
sourcefinal Config config = system.settings().config().getConfig("pekko.kafka.consumer");
final ConsumerSettings<String, byte[]> consumerSettings =
    ConsumerSettings.create(config, new StringDeserializer(), new ByteArrayDeserializer())
        .withBootstrapServers("localhost:9092")
        .withGroupId("group1")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Config inheritance

ConsumerSettingsConsumerSettings are created from configuration in application.conf (with defaults in reference.conf). The format of these settings files are described in the HOCON Config Documentation. A recommended setup is to rely on config inheritance as below:

application.conf (HOCON)
sourceour-kafka-consumer: ${pekko.kafka.consumer} {
  kafka-clients {
    bootstrap.servers = "kafka-host:9092"
  }
}

Read the settings that inherit the defaults from “pekko.kafka.consumer” settings:

Scala
sourceval config = system.settings.config.getConfig("our-kafka-consumer")
val consumerSettings = ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
Java
sourceConfig config = system.settings().config().getConfig("our-kafka-consumer");
ConsumerSettings<String, String> consumerSettings =
    ConsumerSettings.create(config, new StringDeserializer(), new StringDeserializer());

Offset Storage external to Kafka

The Kafka read offset can either be stored in Kafka (see below), or at a data store of your choice.

Consumer.plainSourceConsumer.plainSource and Consumer.plainPartitionedManualOffsetSourceConsumer.plainPartitionedManualOffsetSource can be used to emit ConsumerRecord elements as received from the underlying KafkaConsumer. They do not have support for committing offsets to Kafka. When using these Sources, either store an offset externally, or use auto-commit (note that auto-commit is disabled by default).

Scala
sourceconsumerSettings
  .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
Java
sourceconsumerSettings
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

The consumer application doesn’t need to use Kafka’s built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give “exactly once” semantics that are stronger than the “at-least-once” semantics you get with Kafka’s offset commit functionality.

Scala
source  val db = new OffsetStore
  val control = db.loadOffset().map { fromOffset =>
    Consumer
      .plainSource(
        consumerSettings,
        Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, /* partition = */ 0) -> fromOffset
        )
      )
      .mapAsync(1)(db.businessLogicAndStoreOffset)
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()
  }

class OffsetStore {
  def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
  def loadOffset(): Future[Long] = // ...
}
Java
source  final OffsetStorage db = new OffsetStorage();

  CompletionStage<Consumer.Control> controlCompletionStage =
      db.loadOffset()
          .thenApply(
              fromOffset ->
                  Consumer.plainSource(
                          consumerSettings,
                          Subscriptions.assignmentWithOffset(
                              new TopicPartition(topic, partition0), fromOffset))
                      .mapAsync(1, db::businessLogicAndStoreOffset)
                      .to(Sink.ignore())
                      .run(system));

class OffsetStorage {
  public CompletionStage<Done> businessLogicAndStoreOffset(
      ConsumerRecord<String, String> record) { // ... }
  public CompletionStage<Long> loadOffset() { // ... }
}

For Consumer.plainSourceConsumer.plainSource the Subscriptions.assignmentWithOffsetSubscriptions.assignmentWithOffset specifies the starting point (offset) for a given consumer group id, topic and partition. The group id is defined in the ConsumerSettingsConsumerSettings.

Alternatively, with Consumer.plainPartitionedManualOffsetSourceConsumer.plainPartitionedManualOffsetSource, only the consumer group id and the topic are required on creation. The starting point is fetched by calling the getOffsetsOnAssign function passed in by the user. This function should return a Map of TopicPartition to Long, with the Long representing the starting point. If a consumer is assigned a partition that is not included in the Map that results from getOffsetsOnAssign, the default starting position will be used, according to the consumer configuration value auto.offset.reset. Also note that Consumer.plainPartitionedManualOffsetSourceConsumer.plainPartitionedManualOffsetSource emits tuples of assigned topic-partition and a corresponding source, as in Source per partition.

Offset Storage in Kafka - committing

The Consumer.committableSourceConsumer.committableSource makes it possible to commit offset positions to Kafka. Compared to auto-commit this gives exact control of when a message is considered consumed.

This is useful when “at-least-once” delivery is desired, as each message will likely be delivered one time, but in failure cases could be received more than once.

Scala
source  val control =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(10) { msg =>
        business(msg.record.key, msg.record.value).map(_ => msg.committableOffset)
      }
      .via(Committer.flow(committerDefaults.withMaxBatch(1)))
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()

def business(key: String, value: Array[Byte]): Future[Done] = // ???
Java
sourceCompletionStage<String> business(String key, String value) { // .... }
  Consumer.DrainingControl<Done> control =
      Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
          .mapAsync(
              1,
              msg ->
                  business(msg.record().key(), msg.record().value())
                      .thenApply(done -> msg.committableOffset()))
          .toMat(
              Committer.sink(committerSettings.withMaxBatch(1)), Consumer::createDrainingControl)
          .run(system);

Committing the offset for each message (withMaxBatch(1)) as illustrated above is rather slow. It is recommended to batch the commits for better throughput, in cases when upstream fails the Committer will try to commit the offsets collected before the error.

Committer sink

You can use a pre-defined Committer.sinkCommitter.sink to perform commits in batches:

Scala
sourceval committerSettings = CommitterSettings(system)

val control: DrainingControl[Done] =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
    .run()
Java
sourceCommitterSettings committerSettings = CommitterSettings.create(config);

Consumer.DrainingControl<Done> control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
        .mapAsync(
            1,
            msg ->
                business(msg.record().key(), msg.record().value())
                    .<ConsumerMessage.Committable>thenApply(done -> msg.committableOffset()))
        .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
        .run(system);

When creating a Committer.sinkCommitter.sink you need to pass in CommitterSettingsCommitterSettings. These may be created by passing the actor system to read the defaults from the config section pekko.kafka.committer, or by passing a Config instance with the same structure.

Table
Setting Description Default Value
maxBatch maximum number of messages to commit at once 1000
maxInterval maximum interval between commits 10 seconds
parallelism maximum number of commit batches in flight 100
reference.conf
source# Properties for org.apache.pekko.kafka.CommitterSettings can be
# defined in this section or a configuration section with
# the same layout.
pekko.kafka.committer {

  # Maximum number of messages in a single commit batch
  max-batch = 1000

  # Maximum interval between commits
  max-interval = 10s

  # Parallelsim for async committing
  parallelism = 100

  # API may change.
  # Delivery of commits to the internal actor
  # WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
  # SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since Alpakka Kafka 1.1)
  delivery = WaitForAck

  # API may change.
  # Controls when a `Committable` message is queued to be committed.
  # OffsetFirstObserved: When the offset of a message has been successfully produced.
  # NextOffsetObserved: When the next offset is observed.
  when = OffsetFirstObserved
}

All commit batches are aggregated internally and passed on to Kafka very often (in every poll cycle), the Committer settings configure how the stream sends the offsets to the internal actor which communicates with the Kafka broker. Increasing these values means that in case of a failure you may have to re-process more messages.

If you use Kafka older than version 2.1.0 and consume from a topic with low activity, and possibly no messages arrive for more than 24 hours, consider enabling periodical commit refresh (pekko.kafka.consumer.commit-refresh-interval configuration parameters), otherwise offsets might expire in the Kafka storage. This has been fixed in Kafka 2.1.0 (See KAFKA-4682).

Committer variants

These factory methods are part of the CommitterCommitter.

Factory method Stream element type Emits
sink Committable N/A
sinkWithOffsetContext Any (CommittableOffset in context) N/A
flow Committable Done
batchFlow Committable CommittableOffsetBatch
flowWithOffsetContext Any (CommittableOffset in context) NotUsed (CommittableOffsetBatch in context)

Commit with meta-data

The Consumer.commitWithMetadataSourceConsumer.commitWithMetadataSource allows you to add metadata to the committed offset based on the last consumed record.

Note that the first offset provided to the consumer during a partition assignment will not contain metadata. This offset can get committed due to a periodic commit refresh (pekko.kafka.consumer.commit-refresh-interval configuration parameters) and the commit will not contain metadata.

Scala
sourcedef metadataFromRecord(record: ConsumerRecord[String, String]): String =
  record.timestamp().toString

val control =
  Consumer
    .commitWithMetadataSource(consumerSettings, Subscriptions.topics(topic), metadataFromRecord)
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerDefaults))(DrainingControl.apply)
    .run()
Java
sourceConsumer.DrainingControl<Done> control =
    Consumer.commitWithMetadataSource(
            consumerSettings,
            Subscriptions.topics(topic),
            (record) -> Long.toString(record.timestamp()))
        .mapAsync(
            1,
            msg ->
                business(msg.record().key(), msg.record().value())
                    .thenApply(done -> msg.committableOffset()))
        .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
        .run(system);

Offset Storage in Kafka & external

In some cases you may wish to use external offset storage as your primary means to manage offsets, but also commit offsets to Kafka. This gives you all the benefits of controlling offsets described in Offset Storage external to Kafka and allows you to use tooling in the Kafka ecosystem to track consumer group lag. You can use the Consumer.committablePartitionedManualOffsetSourceConsumer.committablePartitionedManualOffsetSource source, which emits a ConsumerMessage.CommittableMessageConsumerMessage.CommittableMessage, to seek to appropriate offsets on startup, do your processing, commit to external storage, and then commit offsets back to Kafka. This will only provide at-least-once guarantees for your consumer group lag monitoring because it’s possible for a failure between storing your offsets externally and committing to Kafka, but it will give you a more accurate representation of consumer group lag then when turning on auto commits with the enable.auto.commit consumer property.

Consume “at-most-once”

If you commit the offset before processing the message you get “at-most-once” delivery semantics, this is provided by Consumer.atMostOnceSourceConsumer.atMostOnceSource. However, atMostOnceSource commits the offset for each message and that is rather slow, batching of commits is recommended. If your “at-most-once” requirements are more relaxed, consider a Consumer.plainSourceConsumer.plainSource and enable Kafka’s auto committing with enable.auto.commit = true.

Scala
source  val control: DrainingControl[immutable.Seq[Done]] =
    Consumer
      .atMostOnceSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(1)(record => business(record.key, record.value()))
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()

def business(key: String, value: String): Future[Done] = // ???
Java
sourceConsumer.Control control =
    Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(topic))
        .mapAsync(10, record -> business(record.key(), record.value()))
        .to(Sink.foreach(it -> System.out.println("Done with " + it)))
        .run(system);

Consume “at-least-once”

How to achieve at-least-once delivery semantics is covered in At-Least-Once Delivery.

Connecting Producer and Consumer

For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use Consumer.committableSourceConsumer.committableSource and connect it to a Producer.committableSinkProducer.committableSink. The committableSink will commit the offset back to the consumer regularly.

The committableSink accepts implementations ProducerMessage.EnvelopeProducerMessage.Envelope that contain the offset to commit the consumption of the originating message (of type CommittableCommittable). See Producing messages about different implementations of ProducerMessage.EnvelopeProducerMessage.Envelope.

Scala
sourceval control =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
    .map { msg =>
      ProducerMessage.single(
        new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
        msg.committableOffset
      )
    }
    .toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
    .run()
Java
sourceConsumer.DrainingControl<Done> control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
        .map(
            msg ->
                ProducerMessage.<String, String, ConsumerMessage.Committable>single(
                    new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()),
                    msg.committableOffset()))
        .toMat(
            Producer.committableSink(producerSettings, committerSettings),
            Consumer::createDrainingControl)
        .run(system);
Note

There is a risk that something fails after publishing, but before committing, so committableSink has “at-least-once” delivery semantics.

To get delivery guarantees, please read about transactions.

Source per partition

Consumer.plainPartitionedSourceConsumer.plainPartitionedSource , Consumer.committablePartitionedSourceConsumer.committablePartitionedSource, and Consumer.commitWithMetadataPartitionedSourceConsumer.commitWithMetadataPartitionedSource support tracking the automatic partition assignment from Kafka. When a topic-partition is assigned to a consumer, this source will emit a tuple with the assigned topic-partition and a corresponding source. When a topic-partition is revoked, the corresponding source completes.

Scala
sourceval control = Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
  .flatMapMerge(maxPartitions, _._2)
  .via(businessFlow)
  .map(_.committableOffset)
  .toMat(Committer.sink(committerDefaults))(DrainingControl.apply)
  .run()
Java
sourceConsumer.DrainingControl<Done> control =
    Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
        .flatMapMerge(maxPartitions, Pair::second)
        .via(business())
        .map(msg -> msg.committableOffset())
        .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
        .run(system);

Separate streams per partition:

Scala
sourceval control = Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
  .mapAsyncUnordered(maxPartitions) {
    case (topicPartition, source) =>
      source
        .via(businessFlow)
        .map(_.committableOffset)
        .runWith(Committer.sink(committerSettings))
  }
  .toMat(Sink.ignore)(DrainingControl.apply)
  .run()
Java
sourceConsumer.DrainingControl<Done> control =
    Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
        .mapAsyncUnordered(
            maxPartitions,
            pair -> {
              Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
                  pair.second();
              return source
                  .via(business())
                  .map(message -> message.committableOffset())
                  .runWith(Committer.sink(committerSettings), system);
            })
        .toMat(Sink.ignore(), Consumer::createDrainingControl)
        .run(system);

Sharing the KafkaConsumer instance

If you have many streams it can be more efficient to share the underlying KafkaConsumer instance. It is shared by creating a KafkaConsumerActorKafkaConsumerActor. You need to create the actor and stop it by sending KafkaConsumerActor.Stop when it is not needed any longer. You pass the classic ActorRefActorRef as a parameter to the ConsumerConsumer factory methods.

When using a typed ActorSystemActorSystem you can create the KafkaConsumerActorKafkaConsumerActor by using the Apache Pekko typed adapter to create a classic ActorRefActorRef. Then you can carry on using the existing Apache Pekko Connectors Kafka API.

Scala
source// adds support for actors to a classic actor system and context
import pekko.actor.typed.scaladsl.adapter._

// Consumer is represented by actor
val consumer: ActorRef =
  context.actorOf(KafkaConsumerActor.props(consumerSettings), "kafka-consumer-actor")
Java
source// adds support for actors to a classic actor system and context
import org.apache.pekko.actor.typed.javadsl.Adapter;

          // Consumer is represented by actor
          ActorRef consumer = Adapter.actorOf(ctx, KafkaConsumerActor.props(consumerSettings));

Using the KafkaConsumerActorKafkaConsumerActor.

Scala
source// Consumer is represented by actor
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))

// Manually assign topic partition to it
val (controlPartition1, result1) = Consumer
  .plainExternalSource[String, Array[Byte]](
    consumer,
    Subscriptions.assignment(new TopicPartition(topic, partition1)))
  .via(businessFlow)
  .toMat(Sink.seq)(Keep.both)
  .run()

// Manually assign another topic partition
val (controlPartition2, result2) = Consumer
  .plainExternalSource[String, Array[Byte]](
    consumer,
    Subscriptions.assignment(new TopicPartition(topic, partition2)))
  .via(businessFlow)
  .toMat(Sink.seq)(Keep.both)
  .run()

// ....

  controlPartition1.shutdown()
  controlPartition2.shutdown()
consumer ! KafkaConsumerActor.Stop
Java
source// Consumer is represented by actor
ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings)));

// Manually assign topic partition to it
Consumer.Control controlPartition1 =
    Consumer.plainExternalSource(
            consumer, Subscriptions.assignment(new TopicPartition(topic, partition0)))
        .via(business())
        .to(Sink.ignore())
        .run(system);

// Manually assign another topic partition
Consumer.Control controlPartition2 =
    Consumer.plainExternalSource(
            consumer, Subscriptions.assignment(new TopicPartition(topic, partition1)))
        .via(business())
        .to(Sink.ignore())
        .run(system);


consumer.tell(KafkaConsumerActor.stop(), self);

Accessing KafkaConsumer metrics

You can access the underlying consumer metrics via the materialized Control instance:

Scala
sourceval control: Consumer.Control = Consumer
  .plainSource(consumerSettings, Subscriptions.assignment(new TopicPartition(topic, partition)))
  .via(businessFlow)
  .to(Sink.ignore)
  .run()


val metrics: Future[Map[MetricName, Metric]] = control.metrics
metrics.foreach(map => println(s"metrics: ${map.mkString("\n")}"))
Java
source// run the stream to obtain the materialized Control value
Consumer.DrainingControl<Done> control =
    Consumer.plainSource(
            consumerSettings, Subscriptions.assignment(new TopicPartition(topic, 0)))
        .via(business())
        .toMat(Sink.ignore(), Consumer::createDrainingControl)
        .run(system);

CompletionStage<Map<MetricName, Metric>> metrics = control.getMetrics();
metrics.thenAccept(map -> System.out.println("Metrics: " + map));

Accessing KafkaConsumer metadata

Accessing of Kafka consumer metadata is possible as described in Consumer Metadata.

Controlled shutdown

The SourceSource created with Consumer.plainSourceConsumer.plainSource and similar methods materializes to a Consumer.ControlConsumer.Control instance. This can be used to stop the stream in a controlled manner.

When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.

Scala
sourceval (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
      Subscriptions.assignmentWithOffset(
        new TopicPartition(topic, 0) -> offset))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()
Java
sourcefinal OffsetStorage db = new OffsetStorage();

CompletionStage<Consumer.DrainingControl<Done>> control =
    db.loadOffset()
        .thenApply(
            fromOffset ->
                Consumer.plainSource(
                        consumerSettings,
                        Subscriptions.assignmentWithOffset(
                            new TopicPartition(topic, 0), fromOffset))
                    .mapAsync(
                        10,
                        record ->
                            business(record.key(), record.value())
                                .thenApply(res -> db.storeProcessedOffset(record.offset())))
                    .toMat(Sink.ignore(), Consumer::createDrainingControl)
                    .run(system));

// Shutdown the consumer when desired
control.thenAccept(c -> c.drainAndShutdown(executor));

When you are using offset storage in Kafka, the shutdown process involves several steps:

  1. Consumer.Control.stop() to stop producing messages from the Source. This does not stop the underlying Kafka Consumer.
  2. Wait for the stream to complete, so that a commit request has been made for all offsets of all processed messages (via Committer.sink/flow, commitScaladsl() or commitJavadsl()).
  3. Consumer.Control.shutdown() to wait for all outstanding commit requests to finish and stop the Kafka Consumer.

Draining control

To manage this shutdown process, use the Consumer.DrainingControlConsumer.DrainingControl by combining the Consumer.ControlConsumer.Control with the sink’s materialized completion future in toMat or in mapMaterializedValue with DrainingControl.applyConsumer::createDrainingControl. That control offers the method drainAndShutdown which implements the process described above. The wrapped stream completion signal is available through the streamCompletionstreamCompletion() accessor.

Note: The ConsumerSettingsConsumerSettings stop-timeout delays stopping the Kafka Consumer and the stream, but when using drainAndShutdown that delay is not required and can be set to zero (as below).

Scala
sourceval drainingControl =
  Consumer
    .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record).map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()
Java
sourcefinal Executor ec = Executors.newCachedThreadPool();

Consumer.DrainingControl<Done> control =
    Consumer.committableSource(
            consumerSettings.withStopTimeout(Duration.ZERO), Subscriptions.topics(topic))
        .mapAsync(
            1,
            msg ->
                business(msg.record().key(), msg.record().value())
                    .thenApply(done -> msg.committableOffset()))
        .toMat(
            Committer.sink(committerSettings.withMaxBatch(1)), Consumer::createDrainingControl)
        .run(system);

control.drainAndShutdown(ec);