Messages from and to Apache Kafka
A typical source for Projections is messages from Kafka. Apache Pekko Projections supports integration with Kafka using Pekko Connectors Kafka.
The KafkaSourceProvider
uses consumer group assignments from Kafka and can resume from offsets stored in a database.
Apache Pekko Projections can store the offsets from Kafka in a relational DB with JDBC or in relational DB with Slick.
The JdbcProjection
envelope handler will be run by the projection. This means that the target database operations can be run in the same transaction as the storage of the offset, which means when used with exactly-once the offsets will be persisted on the same transaction as the projected model (see Committing offset outside Kafka). It also offers at-least-once semantics.
Offset storage of Kafka offsets are not implemented for Cassandra yet, see issue #97.
A Projection
can also send messages to Kafka.
Dependencies¶
To use the Kafka module of Apache Pekko Projections add the following dependency in your project:
libraryDependencies += "org.apache.pekko" %% "pekko-projection-kafka" % "1.1.0"
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-projection-kafka_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-projection-kafka_${versions.ScalaBinary}:1.1.0"
}
Apache Pekko Projections require Pekko 1.1.3 or later, see Pekko version.
Project Info: Apache Pekko Projections Kafka | |
---|---|
Artifact | org.apache.pekko
pekko-projection-kafka
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.16, 2.12.20, 3.3.5 |
JPMS module name | pekko.projection.kafka |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies¶
The table below shows pekko-projection-kafka
’s direct dependencies and the second tab shows all libraries it depends on transitively.
KafkaSourceProvider¶
A SourceProvider
defines the source of the envelopes that the Projection
will process. A SourceProvider
for messages from Kafka can be defined with the KafkaSourceProvider
like this:
sourceimport org.apache.pekko.kafka.ConsumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
val bootstrapServers = "localhost:9092"
val groupId = "group-wordcount"
val topicName = "words"
val consumerSettings =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val sourceProvider: SourceProvider[MergeableOffset[JLong], ConsumerRecord[String, String]] =
KafkaSourceProvider(system, consumerSettings, Set(topicName))
sourceimport org.apache.pekko.kafka.ConsumerSettings;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
String bootstrapServers = "localhost:9092";
String groupId = "group-wordcount";
String topicName = "words";
ConsumerSettings<String, String> consumerSettings =
ConsumerSettings.create(system, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
SourceProvider<MergeableOffset<Long>, ConsumerRecord<String, String>> sourceProvider =
KafkaSourceProvider.create(system, consumerSettings, Collections.singleton(topicName));
Please consult the Pekko Connectors Kafka documentation for specifics around the ConsumerSettings
. The KafkaSourceProvider
is using Consumer.plainPartitionedManualOffsetSource
.
The Projection
can then be defined as:
sourceval sessionProvider = new HibernateSessionFactory
val projectionId = ProjectionId("WordCount", "wordcount-1")
val projection =
JdbcProjection.exactlyOnce(
projectionId,
sourceProvider,
() => sessionProvider.newInstance(),
handler = () => new WordCountJdbcHandler(wordRepository))
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
ProjectionId projectionId = ProjectionId.of("WordCount", "wordcount-1");
ExactlyOnceProjection<MergeableOffset<Long>, ConsumerRecord<String, String>> projection =
JdbcProjection.exactlyOnce(
projectionId,
sourceProvider,
sessionProvider::newInstance,
() -> new WordCountJdbcHandler(wordRepository),
system);
and the WordCountJdbcHandler
:
sourceclass WordCountJdbcHandler(val wordRepository: WordRepository)
extends JdbcHandler[ConsumerRecord[String, String], HibernateJdbcSession] {
@throws[Exception]
override def process(session: HibernateJdbcSession, envelope: ConsumerRecord[String, String]): Unit = {
val word = envelope.value
wordRepository.increment(session.entityManager, word)
}
}
sourcepublic class WordCountJdbcHandler
extends JdbcHandler<ConsumerRecord<String, String>, HibernateJdbcSession> {
private Logger logger = LoggerFactory.getLogger(getClass());
private WordRepository wordRepository;
public WordCountJdbcHandler(WordRepository wordRepository) {
this.wordRepository = wordRepository;
}
@Override
public void process(HibernateJdbcSession session, ConsumerRecord<String, String> envelope)
throws Exception {
String word = envelope.value();
wordRepository.increment(session.entityManager, word);
}
}
Where the WordRepository
is an implementation of:
sourcetrait WordRepository {
def increment(entityManager: EntityManager, word: String): Unit
}
sourceinterface WordRepository {
void increment(EntityManager entityManager, String word);
}
Committing offset outside Kafka¶
The KafkaSourceProvider
described above stores the Kafka offsets in a database. The main advantage of storing the offsets in a database is that exactly-once processing semantics can be achieved if the target database operations of the projection can be run in the same transaction as the storage of the offset.
However, there is a caveat when chosing for exactly-once
. When the Kafka Consumer Group rebalance occurs it’s possible that some messages from a revoked partitions are still in-flight and have not yet been committed to the offset store. Projections will attempt to filter out such messages, but it’s not possible to guarantee it all the time.
To mitigate that risk, you can increase the value of pekko.projection.kafka.read-offset-delay
(defaults to 500ms). This delay adds a buffer of time between when the Kafka Source Provider starts up, or when it’s assigned a new partition, to retrieve the map of partitions to offsets to give any projections running in parallel a chance to drain in-flight messages.
Committing offset in Kafka¶
When using the approach of committing the offsets back to Kafka the Pekko Connectors Kafka comittableSource can be used, and Apache Pekko Projections is not needed for that usage.
Sending to Kafka¶
To send events to Kafka one can use SendProducer
or Producer.flowWithContext
method in Pekko Connectors Kafka.
Sending to Kafka using the SendProducer¶
An async Handler
that is sending to Kafka may look like this:
sourceclass WordPublisher(topic: String, sendProducer: SendProducer[String, String])(implicit ec: ExecutionContext)
extends Handler[WordEnvelope] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelope: WordEnvelope): Future[Done] = {
val word = envelope.word
// using the word as the key and `DefaultPartitioner` will select partition based on the key
// so that same word always ends up in same partition
val key = word
val producerRecord = new ProducerRecord(topic, key, word)
val result = sendProducer.send(producerRecord).map { recordMetadata =>
logger.infoN("Published word [{}] to topic/partition {}/{}", word, topic, recordMetadata.partition)
Done
}
result
}
}
sourceclass WordPublisher extends Handler<WordEnvelope> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final String topic;
private final SendProducer<String, String> sendProducer;
public WordPublisher(String topic, SendProducer<String, String> sendProducer) {
this.topic = topic;
this.sendProducer = sendProducer;
}
@Override
public CompletionStage<Done> process(WordEnvelope envelope) {
String word = envelope.word;
// using the word as the key and `DefaultPartitioner` will select partition based on the key
// so that same word always ends up in same partition
String key = word;
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, word);
CompletionStage<RecordMetadata> result = sendProducer.send(producerRecord);
CompletionStage<Done> done =
result.thenApply(
recordMetadata -> {
logger.info(
"Published word [{}] to topic/partition {}/{}",
word,
topic,
recordMetadata.partition());
return Done.getInstance();
});
return done;
}
}
The SendProducer
is constructed with:
sourceimport org.apache.kafka.common.serialization.StringSerializer
import org.apache.pekko
import pekko.kafka.ProducerSettings
import pekko.kafka.scaladsl.SendProducer
val bootstrapServers = "localhost:9092"
val topicName = "words"
private val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
import pekko.actor.typed.scaladsl.adapter._ // FIXME might not be needed in later Pekko Connectors Kafka version?
private val sendProducer = SendProducer(producerSettings)(system.toClassic)
sourceimport org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.kafka.javadsl.SendProducer;
String bootstrapServers = "localhost:9092";
String topicName = "words";
ProducerSettings<String, String> producerSettings =
ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(bootstrapServers);
// FIXME classicSystem might not be needed in later Pekko Connectors Kafka version?
SendProducer<String, String> sendProducer =
new SendProducer<>(producerSettings, system.classicSystem());
Please consult the Pekko Connectors Kafka documentation for specifics around the ProducerSettings
and SendProducer
.
The Projection
is defined as:
sourceval sourceProvider = new WordSource
val sessionProvider = new HibernateSessionFactory
val projectionId = ProjectionId("PublishWords", "words")
val projection =
JdbcProjection
.atLeastOnceAsync(
projectionId,
sourceProvider,
() => sessionProvider.newInstance(),
handler = () => new WordPublisher(topicName, sendProducer))
sourceWordSource sourceProvider = new WordSource();
HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
ProjectionId projectionId = ProjectionId.of("PublishWords", "words");
Projection<WordEnvelope> projection =
JdbcProjection.atLeastOnceAsync(
projectionId,
sourceProvider,
sessionProvider::newInstance,
() -> new WordPublisher(topicName, sendProducer),
system);
where the SourceProvider
in this example is:
sourcetype Word = String
type Count = Int
final case class WordEnvelope(offset: Long, word: Word)
class WordSource(implicit ec: ExecutionContext) extends SourceProvider[Long, WordEnvelope] {
private val src = Source(
List(WordEnvelope(1L, "abc"), WordEnvelope(2L, "def"), WordEnvelope(3L, "ghi"), WordEnvelope(4L, "abc")))
override def source(offset: () => Future[Option[Long]]): Future[Source[WordEnvelope, NotUsed]] = {
offset()
.map {
case Some(o) => src.dropWhile(_.offset <= o)
case _ => src
}
.map(_.throttle(1, 1.second))
}
override def extractOffset(env: WordEnvelope): Long = env.offset
override def extractCreationTime(env: WordEnvelope): Long = 0L
}
sourcepublic class WordEnvelope {
public final Long offset;
public final String word;
public WordEnvelope(Long offset, String word) {
this.offset = offset;
this.word = word;
}
}
class WordSource extends SourceProvider<Long, WordEnvelope> {
private final Source<WordEnvelope, NotUsed> src =
Source.from(
Arrays.asList(
new WordEnvelope(1L, "abc"),
new WordEnvelope(2L, "def"),
new WordEnvelope(3L, "ghi"),
new WordEnvelope(4L, "abc")));
@Override
public CompletionStage<Source<WordEnvelope, NotUsed>> source(
Supplier<CompletionStage<Optional<Long>>> offset) {
return offset
.get()
.thenApply(
o -> {
if (o.isPresent())
return src.dropWhile(envelope -> envelope.offset <= o.get())
.throttle(1, Duration.ofSeconds(1));
else return src.throttle(1, Duration.ofSeconds(1));
});
}
@Override
public Long extractOffset(WordEnvelope envelope) {
return envelope.offset;
}
@Override
public long extractCreationTime(WordEnvelope envelope) {
return 0L;
}
}
Sending to Kafka using a Producer Flow¶
Alternatively, we can define the same projection using Producer.flowWithContext
in combination with atLeastOnceFlow
.
The WordSource
emits WordEnvelope
s, therefore we will build a flow that takes every single emitted WordEnvelope
and map it into an Apache Pekko Connectors Kafka ProducerMessage
. The ProducerMessage
factory methods can be used to produce a single message, multiple messages, or pass through a message (skip a message from being produced). The ProducerMessage
will pass through Producer.flowWithContext
that will publish it to the Kafka Topic and finally we map the result to Done
.
sourceimport org.apache.kafka.common.serialization.StringSerializer
import org.apache.pekko
import pekko.kafka.ProducerSettings
import org.apache.kafka.clients.producer.ProducerRecord
import pekko.kafka.ProducerMessage
import pekko.kafka.scaladsl.Producer
import pekko.stream.scaladsl.FlowWithContext
import pekko.projection.ProjectionContext
val bootstrapServers = "localhost:9092"
val topicName = "words"
private val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val producerFlow =
FlowWithContext[WordEnvelope, ProjectionContext]
.map(wordEnv => ProducerMessage.single(new ProducerRecord(topicName, wordEnv.word, wordEnv.word)))
.via(Producer.flowWithContext(producerSettings))
.map(_ => Done)
sourceimport org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.javadsl.Producer;
import org.apache.pekko.stream.javadsl.FlowWithContext;
import org.apache.pekko.projection.ProjectionContext;
String bootstrapServers = "localhost:9092";
String topicName = "words";
ProducerSettings<String, String> producerSettings =
ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(bootstrapServers);
FlowWithContext<WordEnvelope, ProjectionContext, Done, ProjectionContext, NotUsed>
producerFlow =
FlowWithContext.<WordEnvelope, ProjectionContext>create()
.map(
wordEnv ->
ProducerMessage.single(
new ProducerRecord<String, String>(
topicName, wordEnv.word, wordEnv.word)))
.via(Producer.flowWithContext(producerSettings))
.map(__ -> Done.getInstance());
The resulting flow is then used in the atLeastOnceFlow
factory method to build the Projection.
sourceval sourceProvider = new WordSource
val sessionProvider = new HibernateSessionFactory
val projectionId = ProjectionId("PublishWords", "words")
val projection =
JdbcProjection
.atLeastOnceFlow(projectionId, sourceProvider, () => sessionProvider.newInstance(), producerFlow)
sourceWordSource sourceProvider = new WordSource();
HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
ProjectionId projectionId = ProjectionId.of("PublishWords", "words");
Projection<WordEnvelope> projection =
JdbcProjection.atLeastOnceFlow(
projectionId, sourceProvider, sessionProvider::newInstance, producerFlow, system);
Mergeable Offset¶
The offset type for a projection is determined by the SourceProvider
that’s used. Apache Pekko Projections supports a variety of offset types. In most cases an event is associated with a single offset row in the projection implementation’s offset store, but the KafkaSourceProvider
uses a special type of offset called a MergeableOffset
.
MergeableOffset
allows us to read and write a map of offsets to the projection offset store. This is required because a subscription to consume from Kafka normally spans more than 1 Kafka Partition (see the Apache Kafka documentation for more information on Kafka’s partitioning model). To begin consuming from Kafka using offsets stored in a projection’s offset store we must provide the Kafka Consumer with a map of topic partitions to offset map (a java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>
). The Kafka offset map is modelled as multiple rows in the projection offset table, where each row includes the projection name, a surrogate projection key that represents the Kafka topic partition, and the offset as a java.lang.Long
. When a projection with KafkaSourceProvider
is started, or when a Kafka consumer group rebalance occurs, we read all the rows from the offset table for a projection name. When an offset is committed we persist one or more rows of the Kafka offset map back to the projection offset table.
Configuration¶
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
sourcepekko.projection.kafka {
# The time to wait before retrieving the last saved offsets. Due to the asynchronous nature of Pekko Streams,
# when a Kafka Consumer Group rebalance occurs it's possible that some messages from a revoked partitions
# are still in-flight and have not yet been committed to the offset store. Projections will attempt to
# filter out such messages, but it's not possible to guarantee it all the time. This delay adds a small
# buffer of time between when the Kafka Source Provider starts up, or when it's assigned a new partition,
# to retrieve the map of partitions to offsets to give any projections running in parallel a chance
# to drain in-flight messages.
read-offset-delay = 500 ms
}