At-Least-Once Delivery

At-least-once delivery semantics, the requirement to process every message, is a basic requirement of most applications.

When using committable sources (Offset Storage in Kafka), care is needed to ensure at-least-once delivery semantics are not lost inadvertently by committing an offset too early.

Below are some scenarios where this risk is present. These risks can easily be overlooked. Problems can also go undetected during tests since they depend on abruptly interrupting the flow in a particular state, and that state could be unlikely to occur.

Multiple Effects per Commit

Multiple Messages

When connecting a committable source to a producer flow, some applications may require each consumed message to produce more than one message. In that case, in order to preserve at-least-once semantics, the message offset should only be committed after all associated messages have been produced.

To achieve this, use the ProducerMessage.MultiMessageProducerMessage.MultiMessage implementation of ProducerMessage.EnvelopeProducerMessage.Envelope:

Scala
sourceimport org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.kafka.ConsumerMessage.{ CommittableOffset, CommittableOffsetBatch }
import pekko.kafka.ProducerMessage.Envelope
import pekko.kafka.scaladsl.Consumer.DrainingControl
import pekko.kafka.{ ProducerMessage, Subscriptions }
import pekko.kafka.scaladsl.{ Committer, Consumer, Producer }
import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import pekko.stream.scaladsl.{ Keep, Sink }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.clients.producer.ProducerRecord

import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._

Consumer
  .committableSource(consumerSettings, Subscriptions.topics(topic1))
  .map(msg =>
    ProducerMessage.multi(
      immutable.Seq(
        new ProducerRecord(topic2, msg.record.key, msg.record.value),
        new ProducerRecord(topic3, msg.record.key, msg.record.value)),
      msg.committableOffset))
  .via(Producer.flexiFlow(producerSettings))
  .map(_.passThrough)
  .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
  .run()
Java
sourceConsumer.committableSource(consumerSettings, Subscriptions.topics(topic1))
    .map(
        msg -> {
          Envelope<String, String, CommittableOffset> multiMsg =
              ProducerMessage.multi(
                  Arrays.asList(
                      new ProducerRecord<>(topic2, msg.record().value()),
                      new ProducerRecord<>(topic3, msg.record().value())),
                  msg.committableOffset());
          return multiMsg;
        })
    .via(Producer.flexiFlow(producerSettings))
    .map(m -> m.passThrough())
    .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
    .run(system);

Batches

If committable messages are processed in batches (using batch or grouped), it is also important to commit the resulting ConsumerMessage.CommittableOffsetBatchConsumerMessage.CommittableOffsetBatch only after all messages in the batch are fully processed.

Should the batch need to be split up again, using mapConcat, care should be taken to associate the ConsumerMessage.CommittableOffsetBatchConsumerMessage.CommittableOffsetBatch only with the last message. This scenario could occur if we created batches to more efficiently update a database and then needed to split up the batches to send individual messages to a Kafka producer flow.

Multiple Destinations

In the Conditional Message Processing section below we discuss how to handle producing to multiple Kafka topics, but here we consider writing to other types of persistent storage, or performing other side-effects, perhaps in addition to producing to Kafka topics.

To commit an offset or an offset batch only after the multiple effects have been performed, we will usually want to assemble our side-effecting flows in series, one after the other. This still allows the side effects to be performed concurrently on distinct messages, using mapAsync for example.

Alternatively, we could split-off the flow using alsoTo to perform the effects in distinct parallel flows. We would then use zip to bring the two flows back together and re-associate the matching committable offsets. This step is important to ensure that we only commit an offset once the effects from both flows are complete. This constrains the two flows to output the exact same sequence of committable offsets. So this approach may not be significantly more flexible then a serial arrangement.

Non-Sequential Processing

Messages from committable sources should be processed in order, otherwise a larger offset for a partition could be committed before the work associated to a smaller offset has been completed.

Reordering would be acceptable if the original order was reconstituted before committing the offsets, but that is a fairly complex and possibly brittle process that we will not consider here.

Using mapAsync is safe since it preserves the order of messages. That is in contrast to mapAsyncUnordered which would not be safe to use here. As indicated in the Pekko Streams documentation almost all stages will preserve input ordering.

Using groupBy

Using groupBy followed at some point by a mergeSubstreams can reorder messages, so in general it is not safe with respect to the at-least-once guarantee.

However it can only lead to reordering between messages sent to different substreams, so it is possible to use groupBy and preserve at-least-once semantics as long as all messages from the same partition are sent to the same substream.

If a particular substream expects to see all messages regarding some entity, it then requires that writers to the source topic become responsible for placing messages about various entities in the appropriate partitions. If your application already has a requirement to preserve the order of messages about a particular entity within a Kafka topic, you will already need to ensure those messages go to the same partition since Kafka only preserves order information within a partition.

Conditional Message Processing

Most flows will require some messages to be handled differently from others. Unfortunately this is difficult to do while preserving the at-least-once guarantee because the order of messages must be maintained.

We cannot safely send off the messages to be handled differently to a distinct flow: this other flow cannot commit on its own, and even if we merge it back, downstream, with the main flow, ordering will not be preserved. The reason this separate flow cannot commit on its own is that it will only be seeing a subset of the committable messages. If it commits an offset, it cannot know that all prior offsets have been processed in the main flow.

This is a significant challenge. Below we suggest a few strategies to deal with some special cases of this general problem.

Publishing to Message-Dependent Topics

Since ProducerRecord contains the destination topic, it is possible to use a single producer flow to write to any number of topics. This preserves the ordering of messages coming from the committable source. Since the destination topics likely admit different types of messages, it will be necessary to serialize the messages to the appropriate input type for the common producer flow, which could be a byte array or a string.

In case a committable message should lead to the production of multiple messages, the ProducerMessage.MultiMessageProducerMessage.MultiMessage is available. If no messages should be produced, the ProducerMessage.PassThroughMessageProducerMessage.PassThroughMessage can be used.

Scala
sourceConsumer
  .committableSource(consumerSettings, Subscriptions.topics(topic1))
  .map(msg => {
    val out: Envelope[String, String, CommittableOffset] =
      if (duplicate(msg.record.value))
        ProducerMessage.multi(
          immutable.Seq(
            new ProducerRecord(topic2, msg.record.key, msg.record.value),
            new ProducerRecord(topic3, msg.record.key, msg.record.value)),
          msg.committableOffset)
      else if (ignore(msg.record.value))
        ProducerMessage.passThrough(msg.committableOffset)
      else
        ProducerMessage.single(
          new ProducerRecord(topic4, msg.record.key, msg.record.value),
          msg.committableOffset)
    out
  })
  .via(Producer.flexiFlow(producerSettings))
  .map(_.passThrough)
  .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
  .run()
Java
sourceimport org.apache.pekko.Done;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.kafka.*;
import org.apache.pekko.kafka.ConsumerMessage.CommittableOffset;
import org.apache.pekko.kafka.ProducerMessage.Envelope;
import org.apache.pekko.kafka.javadsl.Committer;
import org.apache.pekko.kafka.javadsl.Consumer;
import org.apache.pekko.kafka.javadsl.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1))
    .map(
        msg -> {
          final Envelope<String, String, CommittableOffset> produce;
          if (duplicate(msg.record().value())) {
            produce =
                ProducerMessage.multi(
                    Arrays.asList(
                        new ProducerRecord<>(topic2, msg.record().value()),
                        new ProducerRecord<>(topic3, msg.record().value())),
                    msg.committableOffset());
          } else if (ignore(msg.record().value())) {
            produce = ProducerMessage.passThrough(msg.committableOffset());
          } else {
            produce =
                ProducerMessage.single(
                    new ProducerRecord<>(topic4, msg.record().value()),
                    msg.committableOffset());
          }
          return produce;
        })
    .via(Producer.flexiFlow(producerSettings))
    .map(m -> m.passThrough())
    .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
    .run(system);

Excluding Messages

Failure to deserialize a message is a particular case of conditional message processing. It is also likely that we would have no message to produce to Kafka when we encounter messages that fail to deserialize. As described above, the producer flow will not let us pass through the corresponding committable offset without producing a message.

Why can’t we commit the offsets of bad messages as soon as we encounter them, instead of passing them downstream? Because the previous offsets, for messages that have deserialized successfully, may not have been committed yet. That’s possible if the downstream flow includes a buffer, an asynchronous boundary or performs batching. It is then likely that some previous messages would concurrently be making their way downstream to a final committing stage.

Note that here we assume that we take the full control over the handling of messages that fail to deserialize. To do this, we should not ask for the deserialization to be performed by the committable source. We can instead create a ConsumerSettingsConsumerSettings parametrized by byte arrays. A subsequent map can deserialize and use ProducerMessage.PassThroughMessageProducerMessage.PassThroughMessage to skip bad messages.