Error handling

Failing consumer

Errors from the Kafka consumer will be forwarded to the Apache Pekko Connectors sources that use it, the sources will fail their streams.

Lost connection to the Kafka broker

To fail a Apache Pekko Connectors Kafka consumer in case the Kafka broker is not available, configure a Connection Checker via ConsumerSettingsConsumerSettings. If not Connection Checker is configured, Apache Pekko Connectors will continue to poll the broker indefinitely.

Failing producer

Retry handling for producers is built-in into Kafka. In case of failure when sending a message, an exception will be thrown, which should fail the stream.

Restarting the stream with a backoff stage

Apache Pekko streams provides graph stages to gracefully restart a stream on failure, with a configurable backoff. This can be taken advantage of to restart a failing stream and its consumer with an exponential backoff, by wrapping it in a RestartSource.

Scala
sourceval control = new AtomicReference[Consumer.Control](Consumer.NoopControl)

val restartSettings = RestartSettings(minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2)
val streamCompletion = RestartSource
  .onFailuresWithBackoff(restartSettings) { () =>
    Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      // this is a hack to get access to the Consumer.Control
      // instances of the latest Kafka Consumer source
      .mapMaterializedValue(c => control.set(c))
      .via(businessFlow)
  }
  .runWith(Sink.seq)

control.get().drainAndShutdown(streamCompletion)
Java
sourceAtomicReference<Consumer.Control> control = new AtomicReference<>(Consumer.createNoopControl());

RestartSettings restartSettings =
    RestartSettings.create(Duration.ofSeconds(3), Duration.ofSeconds(30), 0.2);
CompletionStage<Done> streamCompletion =
    RestartSource.onFailuresWithBackoff(
            restartSettings,
            () ->
                Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
                    .mapMaterializedValue(
                        c -> {
                          // this is a hack to get access to the Consumer.Control
                          // instances of the latest Kafka Consumer source
                          control.set(c);
                          return c;
                        })
                    .via(business()))
        .runWith(Sink.ignore(), system);

control.get().drainAndShutdown(streamCompletion, system.getDispatcher());

When a stream fails, library internals will handle all underlying resources.

(de)serialization

If reading from Kafka failure is caused by other reasons, like deserialization problems, then the stage will fail immediately. If you expect such cases, consider consuming raw byte arrays and deserializing in a subsequent map stage where you can use supervision to skip failed elements. See also Serialization and “At least once” pages for more suggestions.

Unexpected consumer offset reset

Sometimes, due to various Kafka server bugs (see below) the consumer will fail to fetch on offset that exists. In this case, the consumer has three approaches to handling the missing offset:

  • No action: consumer fails and stops attempting to make progress.
  • Latest: consumer skips to the end of the partition and starts reading from there.
    • NOTE: consumer can skip processing some data in the topic by going to the latest data
  • Earliest: consumer restarts from the beginning of the partition.
    • NOTE: consumer will never skip data, but may reprocess many days of data, up to the topic’s configured retention

Apache Pekko Connectors Kafka cannot do anything for the first two approaches. However, the offset-reset-protection configuration in the ConsumerSettings can help detect the inadvertent loss of offsets and subsequent reset. You can configure pekko.kafka.consumer.offset-reset-protection.offset-threshold to a number of offsets back from the latest requested offset that would indicate one of these reset bugs has occurred. Similarly, setting pekko.kafka.consumer.offset-reset-protection.time-threshold will reset the consumer back to the latest committed offset when a record is older than now - time-threshold; that is, time-threshold older than the last received offset.

When the client notices that the offset from the next fetched batch is outside the threshold for a given partition, the consumer will be re-seeked back the latest committed offset; the last known ‘safe’ point in the data. The consumer will then start consuming from that offset forward. This can avoid a significant amount of wasted data processing and keep your consumers’ progress moving forward (and avoid being paged for high consumer lag).

For example, lets assume there is a consumer that has committed offset 1000 on partition 1. The consumer is doing work in batches, so it doesn’t commit and fetch every record, so now it attempts to fetch offset 1100. However, due to a server-side bug, Kafka returns offset 1 for partition 1. Without consumer reset protection, the consumer would then need to reprocess all the offsets from 1 to 1000 (this can often look like a consumer “rewinding”). With consumer reset protection enabled with a threshold of, for example, 200 the consumer would notice the reset and then fetch again from the latest offset. That means, the consumer would only need to process 100 messages to catch up, a 10x improvement from the 1000 messages it would have had to process with offset-reset-protection enabled.

By default, consumer reset protection is off. You must set pekko.kafka.consumer.offset-reset-protection.enable = true, and set one of the thresholds, to enable it.

Internally, the consumer attempts to avoid too much overhead in checking each batch, so it verifies only that the first and the last offset in each received batch for each partition are within the threshold. This should have a minimal impact on consumer performance, but as always, be sure to benchmark for your use case before enabling.

Setting offset threshold appropriately

Generally speaking offsets should only be used to define reset thresholds when consuming records whose timestamps are -1 (often only seen with old, 0.11 consumers); instead prefer to use time-thresholdconsumer reset protection configuration.

If you set the offset threshold to less than the frequency at which you commit, any reset protection that takes place will likely cause you reprocess more data than necessary. For example, assume you commit every 1000 records, but have offset-reset-protection.offset-threshold set to 500 records then a reset could then cause you to-reprocess up to 999 records in the worst case.

You should set the consumer reset protection to the number of offsets near the topic’s configured retention. Alternatively, you can (a) increase the frequency with which you commit or (b) increase the offset protection window.

Kafka broker offset reset issues