Producer

A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.

The underlying implementation is using the KafkaProducer, see the KafkaProducer API for details.

Choosing a producer

Apache Pekko Connectors Kafka offers producer flows and sinks that connect to Kafka and write data. The tables below may help you to find the producer best suited for your use-case.

For use-cases that don’t benefit from Apache Pekko Streams, the Send Producer offers a Future-basedCompletionStage-based send API.

Producers

These factory methods are part of the ProducerProducer API.

Factory method May use shared producer Stream element type Pass-through Context
plainSink Yes ProducerRecord N/A N/A
flexiFlow Yes Envelope Any N/A
flowWithContext Yes Envelope No Any

Committing producer sinks

These producers produce messages to Kafka and commit the offsets of incoming messages regularly.

Factory method May use shared producer Stream element type Pass-through Context
committableSink Yes Envelope Committable N/A
committableSinkWithOffsetContext Yes Envelope Any Committable

For details about the batched committing see Consumer: Offset Storage in Kafka - committing.

Transactional producers

These factory methods are part of the TransactionalTransactional API. For details see Transactions. Apache Pekko Connectors Kafka must manage the producer when using transactions.

Factory method May use shared producer Stream element type Pass-through
sink No Envelope N/A
flow No Envelope No
sinkWithOffsetContext No Envelope N/A
flowWithOffsetContext No Envelope No

Settings

When creating a producer stream you need to pass in ProducerSettingsProducerSettings that define things like:

  • bootstrap servers of the Kafka cluster (see Service discovery to defer the server configuration)
  • serializers for the keys and values
  • tuning parameters
Scala
sourceval config = system.settings.config.getConfig("pekko.kafka.producer")
val producerSettings =
  ProducerSettings(config, new StringSerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)
Java
sourcefinal Config config = system.settings().config().getConfig("pekko.kafka.producer");
final ProducerSettings<String, String> producerSettings =
    ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
        .withBootstrapServers("localhost:9092");

In addition to programmatic construction of the ProducerSettingsProducerSettings it can also be created from configuration (application.conf).

When creating ProducerSettingsProducerSettings with a classic ActorSystemActorSystem or typed ActorSystemActorSystem it uses the config section pekko.kafka.producer. The format of these settings files are described in the Typesafe Config Documentation.

source# Properties for org.apache.pekko.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
pekko.kafka.producer {
  # Config path of Apache Pekko Discovery method
  # "pekko.discovery" to use the Apache Pekko Discovery method configured for the ActorSystem
  discovery-method = pekko.discovery

  # Set a service name for use with Apache Pekko Discovery
  # https://pekko.apache.org/docs/pekko-connectors-kafka/current/discovery.html
  service-name = ""

  # Timeout for getting a reply from the discovery-method lookup
  resolve-timeout = 3 seconds

  # Tuning parameter of how many sends that can run in parallel.
  # In 2.0.0: changed the default from 100 to 10000
  parallelism = 10000

  # Duration to wait for `KafkaProducer.close` to finish.
  close-timeout = 60s

  # Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false
  # when the producer instance is shared across multiple producer stages.
  close-on-producer-stop = true

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  use-dispatcher = "pekko.kafka.default-dispatcher"

  # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
  # for exactly-once-semantics processing.
  eos-commit-interval = 100ms

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
  }
}

ProducerSettingsProducerSettings can also be created from any other Config section with the same layout as above.

See Kafka’s KafkaProducer and ProducerConfig for more details regarding settings.

Producer as a Sink

Producer.plainSinkProducer.plainSink is the easiest way to publish messages. The sink consumes the Kafka type ProducerRecord which contains

  1. a topic name to which the record is being sent,
  2. an optional partition number,
  3. an optional key, and
  4. a value.
Scala
sourceval done: Future[Done] =
  Source(1 to 100)
    .map(_.toString)
    .map(value => new ProducerRecord[String, String](topic, value))
    .runWith(Producer.plainSink(producerSettings))
The materialized value of the sink is a Future[Done] which is completed with Done when the stream completes, or with with an exception in case an error occurs.
Java
sourceCompletionStage<Done> done =
    Source.range(1, 100)
        .map(number -> number.toString())
        .map(value -> new ProducerRecord<String, String>(topic, value))
        .runWith(Producer.plainSink(producerSettings), system);
The materialized value of the sink is a CompletionStage<Done> which is completed with Done when the stream completes, or with an exception in case an error occurs.

Producing messages

Sinks and flows accept implementations of ProducerMessage.EnvelopeProducerMessage.Envelope as input. They contain an extra field to pass through data, the so called passThrough. Its value is passed through the flow and becomes available in the ResultsResultspassThrough(). It can for example hold a CommittableOffsetCommittableOffset or ConsumerMessage.CommittableOffsetBatchConsumerMessage.CommittableOffsetBatch from a Consumer.committableSourceConsumer.committableSource that can be committed after publishing to Kafka.

Produce a single message to Kafka

To create one message to a Kafka topic, use the ProducerMessage.MessageProducerMessage.Message type as in

Scala
sourceval single: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] =
  ProducerMessage.single(
    new ProducerRecord("topicName", key, value),
    passThrough)
Java
sourceProducerMessage.Envelope<KeyType, ValueType, PassThroughType> message =
    ProducerMessage.single(new ProducerRecord<>("topicName", key, value), passThrough);

For flows the ProducerMessage.MessageProducerMessage.Messages continue as ResultResult elements containing:

  1. the original input message,
  2. the record metadata (Kafka RecordMetadata API), and
  3. access to the passThrough within the message.

Let one stream element produce multiple messages to Kafka

The ProducerMessage.MultiMessageProducerMessage.MultiMessage contains a list of ProducerRecords to produce multiple messages to Kafka topics.

Scala
sourceval multi: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] =
  ProducerMessage.multi(
    immutable.Seq(
      new ProducerRecord("topicName", key, value),
      new ProducerRecord("anotherTopic", key, value)),
    passThrough)
Java
sourceProducerMessage.Envelope<KeyType, ValueType, PassThroughType> multiMessage =
    ProducerMessage.multi(
        Arrays.asList(
            new ProducerRecord<>("topicName", key, value),
            new ProducerRecord<>("anotherTopic", key, value)),
        passThrough);

For flows the ProducerMessage.MultiMessageProducerMessage.MultiMessages continue as MultiResultMultiResult elements containing:

  1. a list of ProducerMessage.MultiResultPartProducerMessage.MultiResultPart with
    1. the original input message,
    2. the record metadata (Kafka RecordMetadata API), and
  2. the passThrough data.

Let a stream element pass through, without producing a message to Kafka

The ProducerMessage.PassThroughMessageProducerMessage.PassThroughMessage allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages.

Scala
sourceval ptm: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] =
  ProducerMessage.passThrough(
    passThrough)
Java
sourceProducerMessage.Envelope<KeyType, ValueType, PassThroughType> ptm =
    ProducerMessage.passThrough(passThrough);

For flows the ProducerMessage.PassThroughMessageProducerMessage.PassThroughMessages continue as ProducerMessage.PassThroughResultProducerMessage.PassThroughResult elements containing the passThrough data.

Producer as a Flow

Producer.flexiFlowProducer.flexiFlow allows the stream to continue after publishing messages to Kafka. It accepts implementations of ProducerMessage.EnvelopeProducerMessage.Envelope as input, which continue in the flow as implementations of ProducerMessage.ResultsProducerMessage.Results.

Scala
sourceval done = Source(1 to 100)
  .map { number =>
    val partition = 0
    val value = number.toString
    ProducerMessage.single(
      new ProducerRecord(topic, partition, "key", value),
      number)
  }
  .via(Producer.flexiFlow(producerSettings))
  .map {
    case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) =>
      s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"

    case ProducerMessage.MultiResult(parts, passThrough) =>
      parts
        .map {
          case MultiResultPart(metadata, record) =>
            s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"
        }
        .mkString(", ")

    case ProducerMessage.PassThroughResult(passThrough) =>
      s"passed through"
  }
  .runWith(Sink.foreach(println(_)))
Java
sourceCompletionStage<Done> done =
    Source.range(1, 100)
        .map(
            number -> {
              int partition = 0;
              String value = String.valueOf(number);
              ProducerMessage.Envelope<String, String, Integer> msg =
                  ProducerMessage.single(
                      new ProducerRecord<>(topic, partition, "key", value), number);
              return msg;
            })
        .via(Producer.flexiFlow(producerSettings))
        .map(
            result -> {
              if (result instanceof ProducerMessage.Result) {
                ProducerMessage.Result<String, String, Integer> res =
                    (ProducerMessage.Result<String, String, Integer>) result;
                ProducerRecord<String, String> record = res.message().record();
                RecordMetadata meta = res.metadata();
                return meta.topic()
                    + "/"
                    + meta.partition()
                    + " "
                    + res.offset()
                    + ": "
                    + record.value();
              } else if (result instanceof ProducerMessage.MultiResult) {
                ProducerMessage.MultiResult<String, String, Integer> res =
                    (ProducerMessage.MultiResult<String, String, Integer>) result;
                return res.getParts().stream()
                    .map(
                        part -> {
                          RecordMetadata meta = part.metadata();
                          return meta.topic()
                              + "/"
                              + meta.partition()
                              + " "
                              + part.metadata().offset()
                              + ": "
                              + part.record().value();
                        })
                    .reduce((acc, s) -> acc + ", " + s);
              } else {
                return "passed through";
              }
            })
        .runWith(Sink.foreach(System.out::println), system);

Connecting a Producer to a Consumer

The passThrough can for example hold a CommittableCommittable that can be committed after publishing to Kafka.

Scala
sourceval control =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
    .map { msg =>
      ProducerMessage.single(
        new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
        msg.committableOffset
      )
    }
    .toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
    .run()
Java
sourceConsumer.DrainingControl<Done> control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
        .map(
            msg ->
                ProducerMessage.<String, String, ConsumerMessage.Committable>single(
                    new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()),
                    msg.committableOffset()))
        .toMat(
            Producer.committableSink(producerSettings, committerSettings),
            Consumer::createDrainingControl)
        .run(system);

Sharing the KafkaProducer instance

The underlying KafkaProducer is thread safe and sharing a single producer instance across streams will generally be faster than having multiple instances. You cannot share KafkaProducer with the Transactional flows and sinks.

To create a KafkaProducer from the Kafka connector settings described above, the ProducerSettingsProducerSettings contains the factory methods createKafkaProducerAsynccreateKafkaProducerCompletionStage and createKafkaProducer (blocking for asynchronous enriching).

Scala
sourceval config = system.settings.config.getConfig("pekko.kafka.producer")
val producerSettings =
  ProducerSettings(config, new StringSerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)
val kafkaProducer: Future[org.apache.kafka.clients.producer.Producer[String, String]] =
  producerSettings.createKafkaProducerAsync()

// using the kafka producer

kafkaProducer.foreach(p => p.close())
Java
sourcefinal Config config = system.settings().config().getConfig("pekko.kafka.producer");
final ProducerSettings<String, String> producerSettings =
    ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
        .withBootstrapServers("localhost:9092");
final org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer =
    producerSettings.createKafkaProducer();

The KafkaProducer instance (or FutureCompletionStage) is passed as a parameter to ProducerSettingsProducerSettings using the methods withProducer and withProducerFactory.

Scala
source// create a producer
val kafkaProducer = producerSettings.createKafkaProducer()
val settingsWithProducer = producerSettings.withProducer(kafkaProducer)

val done = Source(1 to 100)
  .map(_.toString)
  .map(value => new ProducerRecord[String, String](topic, value))
  .runWith(Producer.plainSink(settingsWithProducer))

// close the producer after use
kafkaProducer.close()
Java
sourceProducerSettings<String, String> settingsWithProducer =
    producerSettings.withProducer(kafkaProducer);

CompletionStage<Done> done =
    Source.range(1, 100)
        .map(number -> number.toString())
        .map(value -> new ProducerRecord<String, String>(topic, value))
        .runWith(Producer.plainSink(settingsWithProducer), system);

Accessing KafkaProducer metrics

By passing an explicit reference to a KafkaProducer (as shown in the previous section) its metrics become accessible. Refer to the Kafka MetricName API for more details.

Scala
sourceval metrics: java.util.Map[org.apache.kafka.common.MetricName, _ <: org.apache.kafka.common.Metric] =
  kafkaProducer.metrics() // observe metrics
Java
sourceMap<org.apache.kafka.common.MetricName, ? extends org.apache.kafka.common.Metric> metrics =
    kafkaProducer.metrics(); // observe metrics