Send Producer

A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.

The Apache Pekko Connectors Kafka SendProducerSendProducer does not integrate with Apache Pekko Streams. Instead, it offers a wrapper of the Apache Kafka KafkaProducer to send data to Kafka topics in a per-element fashion with a Future-basedCompletionStage-based API.

It supports the same settings as Apache Pekko Connectors ProducerProducer flows and sinks and supports service discovery.

After use, the Producer needs to be properly closed via the asynchronous close() method.

Producing

The Send Producer offers methods for sending

After use, the Send Producer should be closed with close().

ProducerRecord

Produce a ProducerRecord to a topic.

Scala
sourceval producer = SendProducer(producerDefaults)
try {
  val send: Future[RecordMetadata] = producer
    .send(new ProducerRecord(topic1, "key", "value"))
  // Blocking here for illustration only, you need to handle the future result
  Await.result(send, 2.seconds)
} finally {
  Await.result(producer.close(), 1.minute)
}
Java
sourceSendProducer<String, String> producer = new SendProducer<>(producerSettings, system);
try {
  CompletionStage<RecordMetadata> result =
      producer.send(new ProducerRecord<>(topic, "key", "value"));
  // Blocking here for illustration only, you need to handle the future result
  RecordMetadata recordMetadata = result.toCompletableFuture().get(2, TimeUnit.SECONDS);
} finally {
  producer.close().toCompletableFuture().get(1, TimeUnit.MINUTES);
}

Envelope

The ProducerMessage.EnvelopeProducerMessage.Envelope can be used to send one record, or a list of of ProducerRecords to produce a single or multiple messages to Kafka topics. The envelope can be used to pass through an arbitrary value which will be attached to the result.

Scala
sourceval producer = SendProducer(producerDefaults)
try {
  val envelope: ProducerMessage.Envelope[String, String, String] =
    ProducerMessage.multi(immutable.Seq(
        new ProducerRecord(topic1, "key", "value1"),
        new ProducerRecord(topic1, "key", "value2"),
        new ProducerRecord(topic1, "key", "value3")),
      "context")
  val send: Future[ProducerMessage.Results[String, String, String]] = producer.sendEnvelope(envelope)
} finally {
  Await.result(producer.close(), 1.minute)
}
Java
sourceSendProducer<String, String> producer = new SendProducer<>(producerSettings, system);
try {
  ProducerMessage.Envelope<String, String, String> envelope =
      ProducerMessage.multi(
          Arrays.asList(
              new ProducerRecord<>(topic, "key", "value1"),
              new ProducerRecord<>(topic, "key", "value2"),
              new ProducerRecord<>(topic, "key", "value3")),
          "context");
  CompletionStage<ProducerMessage.Results<String, String, String>> send =
      producer.sendEnvelope(envelope);
  // Blocking here for illustration only, you need to handle the future result
  ProducerMessage.Results<String, String, String> result =
      send.toCompletableFuture().get(2, TimeUnit.SECONDS);
} finally {
  producer.close().toCompletableFuture().get(1, TimeUnit.MINUTES);
}

After successful sending, a ProducerMessage.MessageProducerMessage.Message will return a ResultResult element containing:

  1. the original input message,
  2. the record metadata (Kafka RecordMetadata API), and
  3. access to the passThrough within the message.

A ProducerMessage.MultiMessageProducerMessage.MultiMessage will return a MultiResultMultiResult containing:

  1. a list of ProducerMessage.MultiResultPartProducerMessage.MultiResultPart with
    1. the original input message,
    2. the record metadata (Kafka RecordMetadata API), and
  2. the passThrough data.