Send Producer
A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.
The Apache Pekko Connectors Kafka SendProducer
SendProducer
does not integrate with Apache Pekko Streams. Instead, it offers a wrapper of the Apache Kafka KafkaProducer
to send data to Kafka topics in a per-element fashion with a Future
-basedCompletionStage
-based API.
It supports the same settings as Apache Pekko Connectors Producer
Producer
flows and sinks and supports service discovery.
After use, the Producer
needs to be properly closed via the asynchronous close()
method.
Producing
The Send Producer offers methods for sending
ProducerRecord
withsend
ProducerMessage.Envelope
ProducerMessage.Envelope
withsendEnvelope
(similar toProducer.flexiFlow
)
After use, the Send Producer should be closed with close()
.
ProducerRecord
Produce a ProducerRecord
to a topic.
- Scala
-
source
val producer = SendProducer(producerDefaults) try { val send: Future[RecordMetadata] = producer .send(new ProducerRecord(topic1, "key", "value")) // Blocking here for illustration only, you need to handle the future result Await.result(send, 2.seconds) } finally { Await.result(producer.close(), 1.minute) }
- Java
-
source
SendProducer<String, String> producer = new SendProducer<>(producerSettings, system); try { CompletionStage<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, "key", "value")); // Blocking here for illustration only, you need to handle the future result RecordMetadata recordMetadata = result.toCompletableFuture().get(2, TimeUnit.SECONDS); } finally { producer.close().toCompletableFuture().get(1, TimeUnit.MINUTES); }
Envelope
The ProducerMessage.Envelope
ProducerMessage.Envelope
can be used to send one record, or a list of of ProducerRecord
s to produce a single or multiple messages to Kafka topics. The envelope can be used to pass through an arbitrary value which will be attached to the result.
- Scala
-
source
val producer = SendProducer(producerDefaults) try { val envelope: ProducerMessage.Envelope[String, String, String] = ProducerMessage.multi(immutable.Seq( new ProducerRecord(topic1, "key", "value1"), new ProducerRecord(topic1, "key", "value2"), new ProducerRecord(topic1, "key", "value3")), "context") val send: Future[ProducerMessage.Results[String, String, String]] = producer.sendEnvelope(envelope) } finally { Await.result(producer.close(), 1.minute) }
- Java
-
source
SendProducer<String, String> producer = new SendProducer<>(producerSettings, system); try { ProducerMessage.Envelope<String, String, String> envelope = ProducerMessage.multi( Arrays.asList( new ProducerRecord<>(topic, "key", "value1"), new ProducerRecord<>(topic, "key", "value2"), new ProducerRecord<>(topic, "key", "value3")), "context"); CompletionStage<ProducerMessage.Results<String, String, String>> send = producer.sendEnvelope(envelope); // Blocking here for illustration only, you need to handle the future result ProducerMessage.Results<String, String, String> result = send.toCompletableFuture().get(2, TimeUnit.SECONDS); } finally { producer.close().toCompletableFuture().get(1, TimeUnit.MINUTES); }
After successful sending, a ProducerMessage.Message
ProducerMessage.Message
will return a Result
Result
element containing:
- the original input message,
- the record metadata (Kafka
RecordMetadata
API), and - access to the
passThrough
within the message.
A ProducerMessage.MultiMessage
ProducerMessage.MultiMessage
will return a MultiResult
MultiResult
containing:
- a list of
ProducerMessage.MultiResultPart
ProducerMessage.MultiResultPart
with- the original input message,
- the record metadata (Kafka
RecordMetadata
API), and
- the
passThrough
data.