Send Producer
A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.
The Apache Pekko Connectors Kafka SendProducerSendProducer does not integrate with Apache Pekko Streams. Instead, it offers a wrapper of the Apache Kafka KafkaProducer to send data to Kafka topics in a per-element fashion with a Future-basedCompletionStage-based API.
It supports the same settings as Apache Pekko Connectors ProducerProducer flows and sinks and supports service discovery.
After use, the Producer needs to be properly closed via the asynchronous close() method.
Producing
The Send Producer offers methods for sending
ProducerRecordwithsendProducerMessage.EnvelopeProducerMessage.EnvelopewithsendEnvelope(similar toProducer.flexiFlow)
After use, the Send Producer should be closed with close().
ProducerRecord
Produce a ProducerRecord to a topic.
- Scala
-
source
val producer = SendProducer(producerDefaults) try { val send: Future[RecordMetadata] = producer .send(new ProducerRecord(topic1, "key", "value")) // Blocking here for illustration only, you need to handle the future result Await.result(send, 2.seconds) } finally { Await.result(producer.close(), 1.minute) } - Java
-
source
SendProducer<String, String> producer = new SendProducer<>(producerSettings, system); try { CompletionStage<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, "key", "value")); // Blocking here for illustration only, you need to handle the future result RecordMetadata recordMetadata = result.toCompletableFuture().get(2, TimeUnit.SECONDS); } finally { producer.close().toCompletableFuture().get(1, TimeUnit.MINUTES); }
Envelope
The ProducerMessage.EnvelopeProducerMessage.Envelope can be used to send one record, or a list of of ProducerRecords to produce a single or multiple messages to Kafka topics. The envelope can be used to pass through an arbitrary value which will be attached to the result.
- Scala
-
source
val producer = SendProducer(producerDefaults) try { val envelope: ProducerMessage.Envelope[String, String, String] = ProducerMessage.multi(immutable.Seq( new ProducerRecord(topic1, "key", "value1"), new ProducerRecord(topic1, "key", "value2"), new ProducerRecord(topic1, "key", "value3")), "context") val send: Future[ProducerMessage.Results[String, String, String]] = producer.sendEnvelope(envelope) } finally { Await.result(producer.close(), 1.minute) } - Java
-
source
SendProducer<String, String> producer = new SendProducer<>(producerSettings, system); try { ProducerMessage.Envelope<String, String, String> envelope = ProducerMessage.multi( Arrays.asList( new ProducerRecord<>(topic, "key", "value1"), new ProducerRecord<>(topic, "key", "value2"), new ProducerRecord<>(topic, "key", "value3")), "context"); CompletionStage<ProducerMessage.Results<String, String, String>> send = producer.sendEnvelope(envelope); // Blocking here for illustration only, you need to handle the future result ProducerMessage.Results<String, String, String> result = send.toCompletableFuture().get(2, TimeUnit.SECONDS); } finally { producer.close().toCompletableFuture().get(1, TimeUnit.MINUTES); }
After successful sending, a ProducerMessage.MessageProducerMessage.Message will return a ResultResult element containing:
- the original input message,
- the record metadata (Kafka
RecordMetadataAPI), and - access to the
passThroughwithin the message.
A ProducerMessage.MultiMessageProducerMessage.MultiMessage will return a MultiResultMultiResult containing:
- a list of
ProducerMessage.MultiResultPartProducerMessage.MultiResultPartwith- the original input message,
- the record metadata (Kafka
RecordMetadataAPI), and
- the
passThroughdata.