Producer
A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.
The underlying implementation is using the KafkaProducer
, see the KafkaProducer
API for details.
Choosing a producer
Apache Pekko Connectors Kafka offers producer flows and sinks that connect to Kafka and write data. The tables below may help you to find the producer best suited for your use-case.
For use-cases that don’t benefit from Apache Pekko Streams, the Send Producer offers a Future
-basedCompletionStage
-based send API.
Producers
These factory methods are part of the Producer
Producer
API.
Factory method | May use shared producer | Stream element type | Pass-through | Context |
---|---|---|---|---|
plainSink |
Yes | ProducerRecord |
N/A | N/A |
flexiFlow |
Yes | Envelope |
Any | N/A |
flowWithContext |
Yes | Envelope |
No | Any |
Committing producer sinks
These producers produce messages to Kafka and commit the offsets of incoming messages regularly.
Factory method | May use shared producer | Stream element type | Pass-through | Context |
---|---|---|---|---|
committableSink |
Yes | Envelope |
Committable |
N/A |
committableSinkWithOffsetContext |
Yes | Envelope |
Any | Committable |
For details about the batched committing see Consumer: Offset Storage in Kafka - committing.
Transactional producers
These factory methods are part of the Transactional
Transactional
API. For details see Transactions. Apache Pekko Connectors Kafka must manage the producer when using transactions.
Factory method | May use shared producer | Stream element type | Pass-through |
---|---|---|---|
sink |
No | Envelope |
N/A |
flow |
No | Envelope |
No |
sinkWithOffsetContext |
No | Envelope |
N/A |
flowWithOffsetContext |
No | Envelope |
No |
Settings
When creating a producer stream you need to pass in ProducerSettings
ProducerSettings
that define things like:
- bootstrap servers of the Kafka cluster (see Service discovery to defer the server configuration)
- serializers for the keys and values
- tuning parameters
- Scala
-
source
val config = system.settings.config.getConfig("pekko.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers)
- Java
-
source
final Config config = system.settings().config().getConfig("pekko.kafka.producer"); final ProducerSettings<String, String> producerSettings = ProducerSettings.create(config, new StringSerializer(), new StringSerializer()) .withBootstrapServers("localhost:9092");
In addition to programmatic construction of the ProducerSettings
ProducerSettings
it can also be created from configuration (application.conf
).
When creating ProducerSettings
ProducerSettings
with a classic ActorSystem
ActorSystem
or typed ActorSystem
ActorSystem
it uses the config section pekko.kafka.producer
. The format of these settings files are described in the Typesafe Config Documentation.
source# Properties for org.apache.pekko.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
pekko.kafka.producer {
# Config path of Apache Pekko Discovery method
# "pekko.discovery" to use the Apache Pekko Discovery method configured for the ActorSystem
discovery-method = pekko.discovery
# Set a service name for use with Apache Pekko Discovery
# https://pekko.apache.org/docs/pekko-connectors-kafka/current/discovery.html
service-name = ""
# Timeout for getting a reply from the discovery-method lookup
resolve-timeout = 3 seconds
# Tuning parameter of how many sends that can run in parallel.
# In 2.0.0: changed the default from 100 to 10000
parallelism = 10000
# Duration to wait for `KafkaProducer.close` to finish.
close-timeout = 60s
# Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false
# when the producer instance is shared across multiple producer stages.
close-on-producer-stop = true
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "pekko.kafka.default-dispatcher"
# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
# for exactly-once-semantics processing.
eos-commit-interval = 100ms
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
}
}
ProducerSettings
ProducerSettings
can also be created from any other Config
section with the same layout as above.
See Kafka’s KafkaProducer
and ProducerConfig
for more details regarding settings.
Producer as a Sink
Producer.plainSink
Producer.plainSink
is the easiest way to publish messages. The sink consumes the Kafka type ProducerRecord
which contains
- a topic name to which the record is being sent,
- an optional partition number,
- an optional key, and
- a value.
- Scala
-
source
The materialized value of the sink is aval done: Future[Done] = Source(1 to 100) .map(_.toString) .map(value => new ProducerRecord[String, String](topic, value)) .runWith(Producer.plainSink(producerSettings))
Future[Done]
which is completed withDone
when the stream completes, or with with an exception in case an error occurs. - Java
-
source
The materialized value of the sink is aCompletionStage<Done> done = Source.range(1, 100) .map(number -> number.toString()) .map(value -> new ProducerRecord<String, String>(topic, value)) .runWith(Producer.plainSink(producerSettings), system);
CompletionStage<Done>
which is completed withDone
when the stream completes, or with an exception in case an error occurs.
Producing messages
Sinks and flows accept implementations of ProducerMessage.Envelope
ProducerMessage.Envelope
as input. They contain an extra field to pass through data, the so called passThrough
. Its value is passed through the flow and becomes available in the Results
Results
’ passThrough()
. It can for example hold a CommittableOffset
CommittableOffset
or ConsumerMessage.CommittableOffsetBatch
ConsumerMessage.CommittableOffsetBatch
from a Consumer.committableSource
Consumer.committableSource
that can be committed after publishing to Kafka.
Produce a single message to Kafka
To create one message to a Kafka topic, use the ProducerMessage.Message
ProducerMessage.Message
type as in
- Scala
-
source
val single: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = ProducerMessage.single( new ProducerRecord("topicName", key, value), passThrough)
- Java
-
source
ProducerMessage.Envelope<KeyType, ValueType, PassThroughType> message = ProducerMessage.single(new ProducerRecord<>("topicName", key, value), passThrough);
For flows the ProducerMessage.Message
ProducerMessage.Message
s continue as Result
Result
elements containing:
- the original input message,
- the record metadata (Kafka
RecordMetadata
API), and - access to the
passThrough
within the message.
Let one stream element produce multiple messages to Kafka
The ProducerMessage.MultiMessage
ProducerMessage.MultiMessage
contains a list of ProducerRecord
s to produce multiple messages to Kafka topics.
- Scala
-
source
val multi: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = ProducerMessage.multi( immutable.Seq( new ProducerRecord("topicName", key, value), new ProducerRecord("anotherTopic", key, value)), passThrough)
- Java
-
source
ProducerMessage.Envelope<KeyType, ValueType, PassThroughType> multiMessage = ProducerMessage.multi( Arrays.asList( new ProducerRecord<>("topicName", key, value), new ProducerRecord<>("anotherTopic", key, value)), passThrough);
For flows the ProducerMessage.MultiMessage
ProducerMessage.MultiMessage
s continue as MultiResult
MultiResult
elements containing:
- a list of
ProducerMessage.MultiResultPart
ProducerMessage.MultiResultPart
with- the original input message,
- the record metadata (Kafka
RecordMetadata
API), and
- the
passThrough
data.
Let a stream element pass through, without producing a message to Kafka
The ProducerMessage.PassThroughMessage
ProducerMessage.PassThroughMessage
allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages.
- Scala
-
source
val ptm: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = ProducerMessage.passThrough( passThrough)
- Java
-
source
ProducerMessage.Envelope<KeyType, ValueType, PassThroughType> ptm = ProducerMessage.passThrough(passThrough);
For flows the ProducerMessage.PassThroughMessage
ProducerMessage.PassThroughMessage
s continue as ProducerMessage.PassThroughResult
ProducerMessage.PassThroughResult
elements containing the passThrough
data.
Producer as a Flow
Producer.flexiFlow
Producer.flexiFlow
allows the stream to continue after publishing messages to Kafka. It accepts implementations of ProducerMessage.Envelope
ProducerMessage.Envelope
as input, which continue in the flow as implementations of ProducerMessage.Results
ProducerMessage.Results
.
- Scala
-
source
val done = Source(1 to 100) .map { number => val partition = 0 val value = number.toString ProducerMessage.single( new ProducerRecord(topic, partition, "key", value), number) } .via(Producer.flexiFlow(producerSettings)) .map { case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) => s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" case ProducerMessage.MultiResult(parts, passThrough) => parts .map { case MultiResultPart(metadata, record) => s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" } .mkString(", ") case ProducerMessage.PassThroughResult(passThrough) => s"passed through" } .runWith(Sink.foreach(println(_)))
- Java
-
source
CompletionStage<Done> done = Source.range(1, 100) .map( number -> { int partition = 0; String value = String.valueOf(number); ProducerMessage.Envelope<String, String, Integer> msg = ProducerMessage.single( new ProducerRecord<>(topic, partition, "key", value), number); return msg; }) .via(Producer.flexiFlow(producerSettings)) .map( result -> { if (result instanceof ProducerMessage.Result) { ProducerMessage.Result<String, String, Integer> res = (ProducerMessage.Result<String, String, Integer>) result; ProducerRecord<String, String> record = res.message().record(); RecordMetadata meta = res.metadata(); return meta.topic() + "/" + meta.partition() + " " + res.offset() + ": " + record.value(); } else if (result instanceof ProducerMessage.MultiResult) { ProducerMessage.MultiResult<String, String, Integer> res = (ProducerMessage.MultiResult<String, String, Integer>) result; return res.getParts().stream() .map( part -> { RecordMetadata meta = part.metadata(); return meta.topic() + "/" + meta.partition() + " " + part.metadata().offset() + ": " + part.record().value(); }) .reduce((acc, s) -> acc + ", " + s); } else { return "passed through"; } }) .runWith(Sink.foreach(System.out::println), system);
Connecting a Producer to a Consumer
The passThrough
can for example hold a Committable
Committable
that can be committed after publishing to Kafka.
- Scala
-
source
val control = Consumer .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2)) .map { msg => ProducerMessage.single( new ProducerRecord(targetTopic, msg.record.key, msg.record.value), msg.committableOffset ) } .toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply) .run()
- Java
-
source
Consumer.DrainingControl<Done> control = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2)) .map( msg -> ProducerMessage.<String, String, ConsumerMessage.Committable>single( new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()), msg.committableOffset())) .toMat( Producer.committableSink(producerSettings, committerSettings), Consumer::createDrainingControl) .run(system);
Sharing the KafkaProducer instance
The underlying KafkaProducer
is thread safe and sharing a single producer instance across streams will generally be faster than having multiple instances. You cannot share KafkaProducer
with the Transactional flows and sinks.
To create a KafkaProducer
from the Kafka connector settings described above, the ProducerSettings
ProducerSettings
contains the factory methods createKafkaProducerAsync
createKafkaProducerCompletionStage
and createKafkaProducer
(blocking for asynchronous enriching).
- Scala
-
source
val config = system.settings.config.getConfig("pekko.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) val kafkaProducer: Future[org.apache.kafka.clients.producer.Producer[String, String]] = producerSettings.createKafkaProducerAsync() // using the kafka producer kafkaProducer.foreach(p => p.close())
- Java
-
source
final Config config = system.settings().config().getConfig("pekko.kafka.producer"); final ProducerSettings<String, String> producerSettings = ProducerSettings.create(config, new StringSerializer(), new StringSerializer()) .withBootstrapServers("localhost:9092"); final org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer = producerSettings.createKafkaProducer();
The KafkaProducer
instance (or FutureCompletionStage) is passed as a parameter to ProducerSettings
ProducerSettings
using the methods withProducer
and withProducerFactory
.
- Scala
-
source
// create a producer val kafkaProducer = producerSettings.createKafkaProducer() val settingsWithProducer = producerSettings.withProducer(kafkaProducer) val done = Source(1 to 100) .map(_.toString) .map(value => new ProducerRecord[String, String](topic, value)) .runWith(Producer.plainSink(settingsWithProducer)) // close the producer after use kafkaProducer.close()
- Java
-
source
ProducerSettings<String, String> settingsWithProducer = producerSettings.withProducer(kafkaProducer); CompletionStage<Done> done = Source.range(1, 100) .map(number -> number.toString()) .map(value -> new ProducerRecord<String, String>(topic, value)) .runWith(Producer.plainSink(settingsWithProducer), system);
Accessing KafkaProducer metrics
By passing an explicit reference to a KafkaProducer
(as shown in the previous section) its metrics become accessible. Refer to the Kafka MetricName
API for more details.
- Scala
-
source
val metrics: java.util.Map[org.apache.kafka.common.MetricName, _ <: org.apache.kafka.common.Metric] = kafkaProducer.metrics() // observe metrics
- Java
-
source
Map<org.apache.kafka.common.MetricName, ? extends org.apache.kafka.common.Metric> metrics = kafkaProducer.metrics(); // observe metrics