Transactions

Kafka Transactions provide guarantees that messages processed in a consume-transform-produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as “committed” only once the transformed messages are successfully produced to the sink.

For full details on how transactions are achieved in Kafka you may wish to review the Kafka Improvement Proposal KIP-98: Exactly Once Delivery and Transactional Messaging and its associated design document.

Transactional Source

The Transactional.sourceTransactional.source emits a ConsumerMessage.TransactionalMessageConsumerMessage.TransactionalMessage which contains topic, partition, and offset information required by the producer during the commit process. Unlike with ConsumerMessage.CommittableMessageConsumerMessage.CommittableMessage, the user is not responsible for committing transactions, this is handled by a Transactional.flowTransactional.flow or Transactional.sinkTransactional.sink.

This source overrides the Kafka consumer property isolation.level to read_committed, so that only committed messages can be consumed.

A consumer group ID must be provided.

Only use this source if you have the intention to connect it to a Transactional.flowTransactional.flow or Transactional.sinkTransactional.sink.

Transactional Sink and Flow

The Transactional.sinkTransactional.sink is similar to the Producer.committableSinkProducer.committableSink in that messages will be automatically committed as part of a transaction. The Transactional.flowTransactional.flow or Transactional.sinkTransactional.sink are required when connecting a consumer to a producer to achieve a transactional workflow.

They override producer properties enable.idempotence to true and max.in.flight.requests.per.connection to 1 as required by the Kafka producer to enable transactions.

A transactional.id must be defined and unique for each instance of the application.

Consume-Transform-Produce Workflow

Kafka transactions are handled transparently to the user. The Transactional.sourceTransactional.source will enforce that a consumer group id is specified and the Transactional.flowTransactional.flow or Transactional.sinkTransactional.sink will enforce that a transactional.id is specified. All other Kafka consumer and producer properties required to enable transactions are overridden.

Transactions are committed on an interval which can be controlled with the producer config pekko.kafka.producer.eos-commit-interval, similar to how exactly once works with Kafka Streams. The default value is 100ms. The larger commit interval is the more records will need to be reprocessed in the event of failure and the transaction is aborted.

When the stream is materialized the producer will initialize the transaction for the provided transactional.id and a transaction will begin. Every commit interval (eos-commit-interval) we check if there are any offsets available to commit. If offsets exist then we suspend backpressured demand while we drain all outstanding messages that have not yet been successfully acknowledged (if any) and then commit the transaction. After the commit succeeds a new transaction is begun and we re-initialize demand for upstream messages.

Messages are also drained from the stream when the consumer gets a rebalance of partitions. In that case, the consumer will wait in the onPartitionsRevoked callback until all of the messages have been drained from the stream and the transaction is committed before allowing the rebalance to continue. The amount of total time the consumer will wait for draining is controlled by the pekko.kafka.consumer.commit-timeout, and the interval between checks is controlled by the pekko.kafka.consuner.eos-draining-check-interval configuration settings.

To gracefully shutdown the stream and commit the current transaction you must call shutdown() on the Consumer.ControlConsumer.Control materialized value to await all produced message acknowledgements and commit the final transaction.

Simple Example

Scala
sourceval control =
  Transactional
    .source(consumerSettings, Subscriptions.topics(sourceTopic))
    .via(businessFlow)
    .map { msg =>
      ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset)
    }
    .toMat(Transactional.sink(producerSettings, transactionalId))(DrainingControl.apply)
    .run()

// ...

control.drainAndShutdown()
Java
sourceConsumer.DrainingControl<Done> control =
    Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic))
        .via(business())
        .map(
            msg ->
                ProducerMessage.single(
                    new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()),
                    msg.partitionOffset()))
        .toMat(
            Transactional.sink(producerSettings, transactionalId),
            Consumer::createDrainingControl)
        .run(system);

// ...

control.drainAndShutdown(ec);

Recovery From Failure

When any stage in the stream fails the whole stream will be torn down. In the general case it’s desirable to allow transient errors to fail the whole stream because they cannot be recovered from within the application. Transient errors can be caused by network partitions, Kafka broker failures, ProducerFencedException’s from other application instances, and so on. When the stream encounters transient errors then the current transaction will be aborted before the stream is torn down. Any produced messages that were not committed will not be available to downstream consumers as long as those consumers are configured with isolation.level = read_committed.

For transient errors we can choose to rely on the Kafka producer’s configuration to retry, or we can handle it ourselves at the Apache Pekko Streams or Application layer. Using the RestartSource we can backoff connection attempts so that we don’t hammer the Kafka cluster in a tight loop.

Scala
sourceval innerControl = new AtomicReference[Control](Consumer.NoopControl)

val stream = RestartSource.onFailuresWithBackoff(
  RestartSettings(
    minBackoff = 1.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2)) { () =>
  Transactional
    .source(consumerSettings, Subscriptions.topics(sourceTopic))
    .via(businessFlow)
    .map { msg =>
      ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset)
    }
    // side effect out the `Control` materialized value because it can't be propagated through the `RestartSource`
    .mapMaterializedValue(c => innerControl.set(c))
    .via(Transactional.flow(producerSettings, transactionalId))
}

stream.runWith(Sink.ignore)

// Add shutdown hook to respond to SIGTERM and gracefully shutdown stream
sys.ShutdownHookThread {
  Await.result(innerControl.get.shutdown(), 10.seconds)
}
Java
sourceAtomicReference<Consumer.Control> innerControl =
    new AtomicReference<>(Consumer.createNoopControl());

Source<ProducerMessage.Results<String, String, ConsumerMessage.PartitionOffset>, NotUsed>
    stream =
        RestartSource.onFailuresWithBackoff(
            RestartSettings.create(
                java.time.Duration.ofSeconds(3), // min backoff
                java.time.Duration.ofSeconds(30), // max backoff
                0.2), // adds 20% "noise" to vary the intervals slightly
            () ->
                Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic))
                    .via(business())
                    .map(
                        msg ->
                            ProducerMessage.single(
                                new ProducerRecord<>(
                                    targetTopic, msg.record().key(), msg.record().value()),
                                msg.partitionOffset()))
                    // side effect out the `Control` materialized value because it can't be
                    // propagated through the `RestartSource`
                    .mapMaterializedValue(
                        control -> {
                          innerControl.set(control);
                          return control;
                        })
                    .via(Transactional.flow(producerSettings, transactionalId)));

CompletionStage<Done> streamCompletion = stream.runWith(Sink.ignore(), system);

// Add shutdown hook to respond to SIGTERM and gracefully shutdown stream
Runtime.getRuntime().addShutdownHook(new Thread(() -> innerControl.get().shutdown()));

Caveats

There are several scenarios that this library’s implementation of Kafka transactions does not automatically account for.

All of the scenarios covered in the At-Least-Once Delivery documentation (Multiple Effects per Commit, Non-Sequential Processing, and Conditional Message Processing) are applicable to transactional scenarios as well.

Only one application instance per transactional.id is allowed. If two application instances with the same transactional.id are run at the same time then the instance that registers with Kafka’s transaction coordinator second will throw a ProducerFencedException so it doesn’t interfere with transactions in process by the first instance. To distribute multiple transactional workflows for the same subscription the user must manually subdivide the subscription across multiple instances of the application. This may be handled internally in future versions.

Any state in the transformation logic is not part of a transaction. It’s left to the user to rebuild state when applying stateful operations with transaction. It’s possible to encode state into messages produced to topics during a transaction. For example you could produce messages to a topic that represents an event log as part of a transaction. This event log can be replayed to reconstitute the correct state before the stateful stream resumes consuming again at startup.

Any side effects that occur in the transformation logic is not part of a transaction (i.e. writes to an database).

The exactly-once-semantics are guaranteed only when your flow consumes from and produces to the same Kafka cluster. Producing to partitions from a 3rd-party source or consuming partitions from one Kafka cluster and producing to another Kafka cluster are not supported.

Further Reading

For more information on exactly once and transactions in Kafka please consult the following resources.