Offset in Cassandra
The CassandraProjection
has support for storing the offset in Cassandra.
The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider
with supported offset types.
The envelope handler can integrate with anything, such as publishing to a message broker, or updating a read model in Cassandra.
The CassandraProjection
offers at-least-once and at-most-once processing semantics, but not exactly-once.
Dependencies¶
To use the Cassandra module of Apache Pekko Projections add the following dependency in your project:
libraryDependencies += "org.apache.pekko" %% "pekko-projection-cassandra" % "1.1.0"
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-projection-cassandra_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-projection-cassandra_${versions.ScalaBinary}:1.1.0"
}
Apache Pekko Projections requires Pekko 1.1.3 or later, see Pekko version.
Project Info: Apache Pekko Projections Cassandra | |
---|---|
Artifact | org.apache.pekko
pekko-projection-cassandra
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.16, 2.12.20, 3.3.5 |
JPMS module name | pekko.projection.cassandra |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies¶
The table below shows pekko-projection-cassandra
’s direct dependencies and the second tab shows all libraries it depends on transitively.
at-least-once¶
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection
val projection =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
sourceimport org.apache.pekko.projection.cassandra.javadsl.CassandraProjection;
import org.apache.pekko.projection.Projection;
import org.apache.pekko.projection.ProjectionId;
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
() -> new ShoppingCartHandler())
.withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section pekko.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
at-most-once¶
The offset for each envelope is stored before the envelope has been processed and giving at-most-once processing semantics. This means that if the projection is restarted from previously stored offset one envelope may not have been processed.
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection
val projection =
CassandraProjection.atMostOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
sourceProjection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atMostOnce(
ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, ShoppingCartHandler::new);
Since the offset must be stored for each envelope this is slower than at-least-once, which can batch offsets before storing.
The ShoppingCartHandler
is shown below.
groupedWithin¶
The envelopes can be grouped before processing, which can be useful for batch updates.
sourceval projection =
CassandraProjection
.groupedWithin(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new GroupedShoppingCartHandler)
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
sourceint groupAfterEnvelopes = 20;
Duration groupAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.groupedWithin(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
GroupedShoppingCartHandler::new)
.withGroup(groupAfterEnvelopes, groupAfterDuration);
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section pekko.projection.grouped
.
When using groupedWithin
the handler is a Handler<List<EventEnvelope<ShoppingCart.Event>>>
. The GroupedShoppingCartHandler
is shown below.
It stores the offset in Cassandra immediately after the handler
has processed the envelopes, but that is still with at-least-once processing semantics. This means that if the projection is restarted from previously stored offset the previous group of envelopes may be processed more than once.
Handler¶
It’s in the Handler
that you implement the processing of each envelope. It’s essentially a function from Envelope
to CompletionStage<Done>
. This means that the envelope handler can integrate with anything, such as publishing to a message broker, or updating a read model in Cassandra.
A handler that is consuming ShoppingCart.Event
from eventsByTag
can look like this:
sourceimport scala.concurrent.duration._
import scala.concurrent.Future
import org.apache.pekko
import pekko.Done
import pekko.projection.scaladsl.Handler
import org.slf4j.LoggerFactory
class ShoppingCartHandler extends Handler[EventEnvelope[ShoppingCart.Event]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
envelope.event match {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info2("Shopping cart {} was checked out at {}", cartId, time)
Future.successful(Done)
case otherEvent =>
logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent)
Future.successful(Done)
}
}
}
sourceimport org.apache.pekko.projection.javadsl.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class ShoppingCartHandler extends Handler<EventEnvelope<ShoppingCart.Event>> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) {
ShoppingCart.Event event = envelope.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);
return CompletableFuture.completedFuture(Done.getInstance());
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
return CompletableFuture.completedFuture(Done.getInstance());
}
}
}
Such simple handlers can also be defined as plain lambdas via the helper Handler.fromFunction
factory method.
Grouped handler¶
When using CassandraProjection.groupedWithin
the handler is processing a List
of envelopes.
sourceimport scala.collection.immutable
class GroupedShoppingCartHandler extends Handler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Future[Done] = {
envelopes.map(_.event).foreach {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info2("Shopping cart {} was checked out at {}", cartId, time)
case otherEvent =>
logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent)
}
Future.successful(Done)
}
}
sourcepublic class GroupedShoppingCartHandler extends Handler<List<EventEnvelope<ShoppingCart.Event>>> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public CompletionStage<Done> process(List<EventEnvelope<ShoppingCart.Event>> envelopes) {
envelopes.forEach(
env -> {
ShoppingCart.Event event = env.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}",
checkedOut.cartId,
checkedOut.eventTime);
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
}
});
return CompletableFuture.completedFuture(Done.getInstance());
}
}
Stateful handler¶
The Handler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state.
The returned CompletionStage<Done>
is to be completed when the processing of the envelope
has finished. The handler will not be invoked with the next envelope until after the returned CompletionStage<Done>
has been completed.
sourceclass WordCountHandler extends Handler[WordEnvelope] {
private val logger = LoggerFactory.getLogger(getClass)
private var state: Map[Word, Count] = Map.empty
override def process(envelope: WordEnvelope): Future[Done] = {
val word = envelope.word
val newCount = state.getOrElse(word, 0) + 1
logger.info("Word count for {} is {}", word, newCount)
state = state.updated(word, newCount)
Future.successful(Done)
}
}
sourcepublic class WordCountHandler extends Handler<WordEnvelope> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Map<String, Integer> state = new HashMap<>();
@Override
public CompletionStage<Done> process(WordEnvelope envelope) {
String word = envelope.word;
int newCount = state.getOrDefault(word, 0) + 1;
logger.info("Word count for {} is {}", word, newCount);
state.put(word, newCount);
return CompletableFuture.completedFuture(Done.getInstance());
}
}
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance. This is the reason why the handler parameter is a factory (Supplier
) of the handler. A new handler instance is also created when the projection is restarted.
However, the state must typically be loaded and updated by asynchronous operations and then it can be error prone to manage the state in variables of the Handler
. For that purpose a StatefulHandler
is provided.
Let us look at how a StatefulHandler
can be implemented in the context of a “word count” domain. The purpose is to process a stream of words and for each word keep track of how many times it has occurred.
Given an envelope and SourceProvider
for this example:
sourcetype Word = String
type Count = Int
final case class WordEnvelope(offset: Long, word: Word)
class WordSource(implicit ec: ExecutionContext) extends SourceProvider[Long, WordEnvelope] {
private val src = Source(
List(WordEnvelope(1L, "abc"), WordEnvelope(2L, "def"), WordEnvelope(3L, "ghi"), WordEnvelope(4L, "abc")))
override def source(offset: () => Future[Option[Long]]): Future[Source[WordEnvelope, NotUsed]] = {
offset().map {
case Some(o) => src.dropWhile(_.offset <= o)
case _ => src
}
}
override def extractOffset(env: WordEnvelope): Long = env.offset
override def extractCreationTime(env: WordEnvelope): Long = 0L
}
sourcepublic class WordEnvelope {
public final Long offset;
public final String word;
public WordEnvelope(Long offset, String word) {
this.offset = offset;
this.word = word;
}
}
class WordSource extends SourceProvider<Long, WordEnvelope> {
private final Source<WordEnvelope, NotUsed> src =
Source.from(
Arrays.asList(
new WordEnvelope(1L, "abc"),
new WordEnvelope(2L, "def"),
new WordEnvelope(3L, "ghi"),
new WordEnvelope(4L, "abc")));
@Override
public CompletionStage<Source<WordEnvelope, NotUsed>> source(
Supplier<CompletionStage<Optional<Long>>> offset) {
return offset
.get()
.thenApply(
o -> {
if (o.isPresent()) return src.dropWhile(envelope -> envelope.offset <= o.get());
else return src;
});
}
@Override
public Long extractOffset(WordEnvelope envelope) {
return envelope.offset;
}
@Override
public long extractCreationTime(WordEnvelope envelope) {
return 0L;
}
}
and a repository for the interaction with the database:
sourcetrait WordCountRepository {
def load(id: String, word: Word): Future[Count]
def loadAll(id: String): Future[Map[Word, Count]]
def save(id: String, word: Word, count: Count): Future[Done]
}
sourcepublic interface WordCountRepository {
CompletionStage<Integer> load(String id, String word);
CompletionStage<Map<String, Integer>> loadAll(String id);
CompletionStage<Done> save(String id, String word, int count);
}
The Projection
can be definined as:
sourceval projection =
CassandraProjection
.atLeastOnce[Long, WordEnvelope](
projectionId,
sourceProvider = new WordSource,
handler = () => new WordCountHandler(projectionId, repository))
sourceProjection<WordEnvelope> projection =
CassandraProjection.atLeastOnce(
projectionId, new WordSource(), () -> new WordCountHandler(projectionId, repository));
The handler
can be implemented as follows.
A naive approach would be to have one row per word for storing the current count in the database. The handler could be implemented as a completely stateless handler that for each processed envelope loads the current count from the database, increment the count by 1 and saved it again. Typically there will be several instances of the Projection
with different ProjectionId.id
. Each Projection
instance would be responsible for processing a subset of all words. This stateless approach wouldn’t be very efficient and you would have to use optimistic database locking to make sure that one Projection
instance is not overwriting the stored value from another instance without reading the right value first.
Better would be that each Projection
instance is a single-writer so that it can keep the current word count in memory and only load it on startup or on demand.
A handler that is loading the state from the database when it’s starting up:
sourceimport org.apache.pekko.projection.scaladsl.StatefulHandler
class WordCountHandler(projectionId: ProjectionId, repository: WordCountRepository)(implicit ec: ExecutionContext)
extends StatefulHandler[Map[Word, Count], WordEnvelope] {
override def initialState(): Future[Map[Word, Count]] = repository.loadAll(projectionId.id)
override def process(state: Map[Word, Count], envelope: WordEnvelope): Future[Map[Word, Count]] = {
val word = envelope.word
val newCount = state.getOrElse(word, 0) + 1
val newState = for {
_ <- repository.save(projectionId.id, word, newCount)
} yield state.updated(word, newCount)
newState
}
}
sourceimport org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.SupervisorStrategy;
import org.apache.pekko.actor.typed.javadsl.AskPattern;
import org.apache.pekko.actor.typed.javadsl.StashBuffer;
import org.apache.pekko.projection.cassandra.CassandraProjectionTest;
import org.apache.pekko.projection.javadsl.ActorHandler;
import org.apache.pekko.projection.javadsl.StatefulHandler;
public class WordCountHandler extends StatefulHandler<Map<String, Integer>, WordEnvelope> {
private final ProjectionId projectionId;
private final WordCountRepository repository;
public WordCountHandler(ProjectionId projectionId, WordCountRepository repository) {
this.projectionId = projectionId;
this.repository = repository;
}
@Override
public CompletionStage<Map<String, Integer>> initialState() {
return repository.loadAll(projectionId.id());
}
@Override
public CompletionStage<Map<String, Integer>> process(
Map<String, Integer> state, WordEnvelope envelope) {
String word = envelope.word;
int newCount = state.getOrDefault(word, 0) + 1;
CompletionStage<Map<String, Integer>> newState =
repository
.save(projectionId.id(), word, newCount)
.thenApply(
done -> {
state.put(word, newCount);
return state;
});
return newState;
}
}
The StatefulHandler
has two methods that needs to be implemented.
initialState
- Invoked to load the initial state when the projection is started or if previousprocess
failed.process(state, envelope)
- Invoked for eachEnvelope
, one at a time. Thestate
parameter is the completed value of the previously returnedCompletionStage<State>
or theinitialState
.
If the previously returned CompletionStage<State>
failed it will call initialState
again and use that value.
Another implementation would be a handler that is loading the current count for a word on demand, and thereafter caches it in the in-memory state:
sourceimport org.apache.pekko.projection.scaladsl.StatefulHandler
class WordCountHandler(projectionId: ProjectionId, repository: WordCountRepository)(implicit ec: ExecutionContext)
extends StatefulHandler[Map[Word, Count], WordEnvelope] {
override def initialState(): Future[Map[Word, Count]] =
Future.successful(Map.empty)
override def process(state: Map[Word, Count], envelope: WordEnvelope): Future[Map[Word, Count]] = {
val word = envelope.word
val currentCount =
state.get(word) match {
case None =>
repository.load(projectionId.id, word)
case Some(count) =>
Future.successful(count)
}
val newState = for {
c <- currentCount
newCount = c + 1
_ <- repository.save(projectionId.id, word, newCount)
} yield state.updated(word, newCount)
newState
}
}
sourceimport org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.SupervisorStrategy;
import org.apache.pekko.actor.typed.javadsl.AskPattern;
import org.apache.pekko.actor.typed.javadsl.StashBuffer;
import org.apache.pekko.projection.cassandra.CassandraProjectionTest;
import org.apache.pekko.projection.javadsl.ActorHandler;
import org.apache.pekko.projection.javadsl.StatefulHandler;
public class WordCountHandler extends StatefulHandler<Map<String, Integer>, WordEnvelope> {
private final ProjectionId projectionId;
private final WordCountRepository repository;
public WordCountHandler(ProjectionId projectionId, WordCountRepository repository) {
this.projectionId = projectionId;
this.repository = repository;
}
@Override
public CompletionStage<Map<String, Integer>> initialState() {
return CompletableFuture.completedFuture(new HashMap<>());
}
@Override
public CompletionStage<Map<String, Integer>> process(
Map<String, Integer> state, WordEnvelope envelope) {
String word = envelope.word;
CompletionStage<Integer> currentCount;
if (state.containsKey(word))
currentCount = CompletableFuture.completedFuture(state.get(word));
else currentCount = repository.load(projectionId.id(), word);
CompletionStage<Map<String, Integer>> newState =
currentCount.thenCompose(
n -> {
return repository
.save(projectionId.id(), word, n + 1)
.thenApply(
done -> {
state.put(word, n + 1);
return state;
});
});
return newState;
}
}
Actor handler¶
A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.
Flow handler¶
An Apache Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.
Handler lifecycle¶
You can override the start
and stop
methods of the Handler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Schema¶
The database schema for the offset storage table.
The partition
field is used to distribute projection rows across cassandra nodes while also allowing us to query all rows for a projection name. For most offset types we return only one row that matches the provided projection key, but the MergeableOffset
requires all rows.
CREATE TABLE IF NOT EXISTS pekko_projection.offset_store (
projection_name text,
partition int,
projection_key text,
offset text,
manifest text,
last_updated timestamp,
PRIMARY KEY ((projection_name, partition), projection_key));
CREATE TABLE IF NOT EXISTS pekko_projection.projection_management (
projection_name text,
partition int,
projection_key text,
paused boolean,
last_updated timestamp,
PRIMARY KEY ((projection_name, partition), projection_key));
Offset types¶
The supported offset types of the CassandraProjection
are:
org.apache.pekko.persistence.query.Offset
types from events from Apache Pekko PersistenceString
- Integer
Long
- Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
The MergeableOffset
that is used for messages from Kafka is not implemented for the CassandraProjection
yet, see issue #97.
The schema can be created using the method CassandraProjection.createTablesIfNotExists
. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.
Configuration¶
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
sourcepekko.projection.cassandra {
session-config-path = "pekko.projection.cassandra.session-config"
session-config {
# The implementation of `org.apache.pekko.stream.connectors.cassandra.CqlSessionProvider`
# used for creating the `CqlSession`.
# It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
session-provider = "org.apache.pekko.stream.connectors.cassandra.DefaultSessionProvider"
# Configure Pekko Discovery by setting a service name
service-discovery {
name = ""
lookup-timeout = 1 s
}
# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "pekko.actor.default-dispatcher"
# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
# defined with this property.
# and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
datastax-java-driver-config = "datastax-java-driver"
}
offset-store {
keyspace = "pekko_projection"
# the database table name for the offset store
table = "offset_store"
# the database table name for the projection management data
management-table = "projection_management"
}
}
Cassandra driver configuration¶
All Cassandra driver settings are via its standard profile mechanism.
One important setting is to configure the database driver to retry the initial connection:
datastax-java-driver.advanced.reconnect-on-init = true
It is not enabled automatically as it is in the driver’s reference.conf and is not overridable in a profile.
It is possible to share the same Cassandra session as Apache Pekko Persistence Cassandra by setting the session-config-path
:
pekko.projection.cassandra {
session-config-path = "pekko.persistence.cassandra"
}
or share the same Cassandra session as Pekko Connectors Cassandra:
pekko.projection.cassandra {
session-config-path = "connectors.cassandra"
}
Cassandra driver overrides¶
source# See reference configuration at
# https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
# (check which exact version Pekko Projections uses)
datastax-java-driver {
# always set this to allow reconnection on startup if cassandra is down
# not overridable profile so this plugin can't override it for you
# advanced.reconnect-on-init = true
profiles {
pekko-projection-cassandra-profile {
basic.request {
consistency = QUORUM
# the offset store does not use any counters or collections
default-idempotence = true
}
}
}
}
Contact points configuration¶
The Cassandra server contact points can be defined with the Cassandra driver configuration
datastax-java-driver {
basic.contact-points = ["127.0.0.1:9042"]
basic.load-balancing-policy.local-datacenter = "datacenter1"
}
Alternatively, Pekko Discovery can be used for finding the Cassandra server contact points as described in the Pekko Connectors Cassandra documentation.
Without any configuration it will use localhost:9042
as default.