Offset in Cassandra
The CassandraProjection
CassandraProjection
has support for storing the offset in Cassandra.
The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider
with supported offset types.
The envelope handler can integrate with anything, such as publishing to a message broker, or updating a read model in Cassandra.
The CassandraProjection
offers at-least-once and at-most-once processing semantics, but not exactly-once.
Dependencies
To use the Cassandra module of Apache Pekko Projections add the following dependency in your project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-cassandra" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-cassandra_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-cassandra_${versions.ScalaBinary}:1.0.0" }
Apache Pekko Projections requires Pekko 1.0.2 or later, see Pekko version.
Project Info: Apache Pekko Projections Cassandra | |
---|---|
Artifact | org.apache.pekko
pekko-projection-cassandra
1.0.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 3.3.3, 2.13.13, 2.12.19 |
JPMS module name | pekko.projection.cassandra |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies
The table below shows pekko-projection-cassandra
’s direct dependencies and the second tab shows all libraries it depends on transitively.
at-least-once
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.cassandra.scaladsl.CassandraProjection val projection = CassandraProjection .atLeastOnce( projectionId = ProjectionId("shopping-carts", "carts-1"), sourceProvider, handler = () => new ShoppingCartHandler) .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- Java
-
source
import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection; import org.apache.pekko.projection.Projection; import org.apache.pekko.projection.ProjectionId; int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection = CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, () -> new ShoppingCartHandler()) .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section pekko.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
at-most-once
The offset for each envelope is stored before the envelope has been processed and giving at-most-once processing semantics. This means that if the projection is restarted from previously stored offset one envelope may not have been processed.
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.cassandra.scaladsl.CassandraProjection val projection = CassandraProjection.atMostOnce( projectionId = ProjectionId("shopping-carts", "carts-1"), sourceProvider, handler = () => new ShoppingCartHandler)
- Java
-
source
Projection<EventEnvelope<ShoppingCart.Event>> projection = CassandraProjection.atMostOnce( ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, ShoppingCartHandler::new);
Since the offset must be stored for each envelope this is slower than at-least-once, which can batch offsets before storing.
The ShoppingCartHandler
is shown below.
groupedWithin
The envelopes can be grouped before processing, which can be useful for batch updates.
- Scala
-
source
val projection = CassandraProjection .groupedWithin( projectionId = ProjectionId("shopping-carts", "carts-1"), sourceProvider, handler = () => new GroupedShoppingCartHandler) .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
- Java
-
source
int groupAfterEnvelopes = 20; Duration groupAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection = CassandraProjection.groupedWithin( ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, GroupedShoppingCartHandler::new) .withGroup(groupAfterEnvelopes, groupAfterDuration);
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section pekko.projection.grouped
.
When using groupedWithin
the handler is a Handler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]
Handler<List<EventEnvelope<ShoppingCart.Event>>>
. The GroupedShoppingCartHandler
is shown below.
It stores the offset in Cassandra immediately after the handler
has processed the envelopes, but that is still with at-least-once processing semantics. This means that if the projection is restarted from previously stored offset the previous group of envelopes may be processed more than once.
Handler
It’s in the Handler
Handler
that you implement the processing of each envelope. It’s essentially a function from Envelope
to Future[Done]
CompletionStage<Done>
. This means that the envelope handler can integrate with anything, such as publishing to a message broker, or updating a read model in Cassandra.
A handler that is consuming ShoppingCart.Event
from eventsByTag
can look like this:
- Scala
-
source
import scala.concurrent.duration._ import scala.concurrent.Future import org.apache.pekko import pekko.Done import pekko.projection.scaladsl.Handler import org.slf4j.LoggerFactory class ShoppingCartHandler extends Handler[EventEnvelope[ShoppingCart.Event]] { private val logger = LoggerFactory.getLogger(getClass) override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { envelope.event match { case ShoppingCart.CheckedOut(cartId, time) => logger.info2("Shopping cart {} was checked out at {}", cartId, time) Future.successful(Done) case otherEvent => logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent) Future.successful(Done) } } }
- Java
-
source
import org.apache.pekko.projection.javadsl.Handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class ShoppingCartHandler extends Handler<EventEnvelope<ShoppingCart.Event>> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) { ShoppingCart.Event event = envelope.event(); if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; logger.info( "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime); return CompletableFuture.completedFuture(Done.getInstance()); } else { logger.debug("Shopping cart {} changed by {}", event.getCartId(), event); return CompletableFuture.completedFuture(Done.getInstance()); } } }
Such simple handlers can also be defined as plain functionslambdas via the helper Handler.apply
Handler.fromFunction
factory method.
Grouped handler
When using CassandraProjection.groupedWithin
the handler is processing a Seq
List
of envelopes.
- Scala
-
source
import scala.collection.immutable class GroupedShoppingCartHandler extends Handler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] { private val logger = LoggerFactory.getLogger(getClass) override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Future[Done] = { envelopes.map(_.event).foreach { case ShoppingCart.CheckedOut(cartId, time) => logger.info2("Shopping cart {} was checked out at {}", cartId, time) case otherEvent => logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent) } Future.successful(Done) } }
- Java
-
source
public class GroupedShoppingCartHandler extends Handler<List<EventEnvelope<ShoppingCart.Event>>> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<Done> process(List<EventEnvelope<ShoppingCart.Event>> envelopes) { envelopes.forEach( env -> { ShoppingCart.Event event = env.event(); if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; logger.info( "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime); } else { logger.debug("Shopping cart {} changed by {}", event.getCartId(), event); } }); return CompletableFuture.completedFuture(Done.getInstance()); } }
Stateful handler
The Handler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state.
The returned Future[Done]
CompletionStage<Done>
is to be completed when the processing of the envelope
has finished. The handler will not be invoked with the next envelope until after the returned Future[Done]
CompletionStage<Done>
has been completed.
- Scala
-
source
class WordCountHandler extends Handler[WordEnvelope] { private val logger = LoggerFactory.getLogger(getClass) private var state: Map[Word, Count] = Map.empty override def process(envelope: WordEnvelope): Future[Done] = { val word = envelope.word val newCount = state.getOrElse(word, 0) + 1 logger.info("Word count for {} is {}", word, newCount) state = state.updated(word, newCount) Future.successful(Done) } }
- Java
-
source
public class WordCountHandler extends Handler<WordEnvelope> { private final Logger logger = LoggerFactory.getLogger(getClass()); private final Map<String, Integer> state = new HashMap<>(); @Override public CompletionStage<Done> process(WordEnvelope envelope) { String word = envelope.word; int newCount = state.getOrDefault(word, 0) + 1; logger.info("Word count for {} is {}", word, newCount); state.put(word, newCount); return CompletableFuture.completedFuture(Done.getInstance()); } }
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance. This is the reason why the handler parameter is a factory (() =>
)(Supplier
) of the handler. A new handler instance is also created when the projection is restarted.
However, the state must typically be loaded and updated by asynchronous operations and then it can be error prone to manage the state in variables of the Handler
. For that purpose a StatefulHandler
StatefulHandler
is provided.
Let us look at how a StatefulHandler
can be implemented in the context of a “word count” domain. The purpose is to process a stream of words and for each word keep track of how many times it has occurred.
Given an envelope and SourceProvider
for this example:
- Scala
-
source
type Word = String type Count = Int final case class WordEnvelope(offset: Long, word: Word) class WordSource(implicit ec: ExecutionContext) extends SourceProvider[Long, WordEnvelope] { private val src = Source( List(WordEnvelope(1L, "abc"), WordEnvelope(2L, "def"), WordEnvelope(3L, "ghi"), WordEnvelope(4L, "abc"))) override def source(offset: () => Future[Option[Long]]): Future[Source[WordEnvelope, NotUsed]] = { offset().map { case Some(o) => src.dropWhile(_.offset <= o) case _ => src } } override def extractOffset(env: WordEnvelope): Long = env.offset override def extractCreationTime(env: WordEnvelope): Long = 0L }
- Java
-
source
public class WordEnvelope { public final Long offset; public final String word; public WordEnvelope(Long offset, String word) { this.offset = offset; this.word = word; } } class WordSource extends SourceProvider<Long, WordEnvelope> { private final Source<WordEnvelope, NotUsed> src = Source.from( Arrays.asList( new WordEnvelope(1L, "abc"), new WordEnvelope(2L, "def"), new WordEnvelope(3L, "ghi"), new WordEnvelope(4L, "abc"))); @Override public CompletionStage<Source<WordEnvelope, NotUsed>> source( Supplier<CompletionStage<Optional<Long>>> offset) { return offset .get() .thenApply( o -> { if (o.isPresent()) return src.dropWhile(envelope -> envelope.offset <= o.get()); else return src; }); } @Override public Long extractOffset(WordEnvelope envelope) { return envelope.offset; } @Override public long extractCreationTime(WordEnvelope envelope) { return 0L; } }
and a repository for the interaction with the database:
- Scala
-
source
trait WordCountRepository { def load(id: String, word: Word): Future[Count] def loadAll(id: String): Future[Map[Word, Count]] def save(id: String, word: Word, count: Count): Future[Done] }
- Java
-
source
public interface WordCountRepository { CompletionStage<Integer> load(String id, String word); CompletionStage<Map<String, Integer>> loadAll(String id); CompletionStage<Done> save(String id, String word, int count); }
The Projection
can be definined as:
- Scala
-
source
val projection = CassandraProjection .atLeastOnce[Long, WordEnvelope]( projectionId, sourceProvider = new WordSource, handler = () => new WordCountHandler(projectionId, repository))
- Java
-
source
Projection<WordEnvelope> projection = CassandraProjection.atLeastOnce( projectionId, new WordSource(), () -> new WordCountHandler(projectionId, repository));
The handler
can be implemented as follows.
A naive approach would be to have one row per word for storing the current count in the database. The handler could be implemented as a completely stateless handler that for each processed envelope loads the current count from the database, increment the count by 1 and saved it again. Typically there will be several instances of the Projection
with different ProjectionId.id
. Each Projection
instance would be responsible for processing a subset of all words. This stateless approach wouldn’t be very efficient and you would have to use optimistic database locking to make sure that one Projection
instance is not overwriting the stored value from another instance without reading the right value first.
Better would be that each Projection
instance is a single-writer so that it can keep the current word count in memory and only load it on startup or on demand.
A handler that is loading the state from the database when it’s starting up:
- Scala
-
source
import org.apache.pekko.projection.scaladsl.StatefulHandler class WordCountHandler(projectionId: ProjectionId, repository: WordCountRepository)(implicit ec: ExecutionContext) extends StatefulHandler[Map[Word, Count], WordEnvelope] { override def initialState(): Future[Map[Word, Count]] = repository.loadAll(projectionId.id) override def process(state: Map[Word, Count], envelope: WordEnvelope): Future[Map[Word, Count]] = { val word = envelope.word val newCount = state.getOrElse(word, 0) + 1 val newState = for { _ <- repository.save(projectionId.id, word, newCount) } yield state.updated(word, newCount) newState } }
- Java
-
source
import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.SupervisorStrategy; import org.apache.pekko.actor.typed.javadsl.AskPattern; import org.apache.pekko.actor.typed.javadsl.StashBuffer; import org.apache.pekko.projection.cassandra.CassandraProjectionTest; import org.apache.pekko.projection.javadsl.ActorHandler; import org.apache.pekko.projection.javadsl.StatefulHandler; public class WordCountHandler extends StatefulHandler<Map<String, Integer>, WordEnvelope> { private final ProjectionId projectionId; private final WordCountRepository repository; public WordCountHandler(ProjectionId projectionId, WordCountRepository repository) { this.projectionId = projectionId; this.repository = repository; } @Override public CompletionStage<Map<String, Integer>> initialState() { return repository.loadAll(projectionId.id()); } @Override public CompletionStage<Map<String, Integer>> process( Map<String, Integer> state, WordEnvelope envelope) { String word = envelope.word; int newCount = state.getOrDefault(word, 0) + 1; CompletionStage<Map<String, Integer>> newState = repository .save(projectionId.id(), word, newCount) .thenApply( done -> { state.put(word, newCount); return state; }); return newState; } }
The StatefulHandler
has two methods that needs to be implemented.
initialState
- Invoked to load the initial state when the projection is started or if previousprocess
failed.process(state, envelope)
- Invoked for eachEnvelope
, one at a time. Thestate
parameter is the completed value of the previously returnedFuture[State]
CompletionStage<State>
or theinitialState
.
If the previously returned Future[State]
CompletionStage<State>
failed it will call initialState
again and use that value.
Another implementation would be a handler that is loading the current count for a word on demand, and thereafter caches it in the in-memory state:
- Scala
-
source
import org.apache.pekko.projection.scaladsl.StatefulHandler class WordCountHandler(projectionId: ProjectionId, repository: WordCountRepository)(implicit ec: ExecutionContext) extends StatefulHandler[Map[Word, Count], WordEnvelope] { override def initialState(): Future[Map[Word, Count]] = Future.successful(Map.empty) override def process(state: Map[Word, Count], envelope: WordEnvelope): Future[Map[Word, Count]] = { val word = envelope.word val currentCount = state.get(word) match { case None => repository.load(projectionId.id, word) case Some(count) => Future.successful(count) } val newState = for { c <- currentCount newCount = c + 1 _ <- repository.save(projectionId.id, word, newCount) } yield state.updated(word, newCount) newState } }
- Java
-
source
import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.SupervisorStrategy; import org.apache.pekko.actor.typed.javadsl.AskPattern; import org.apache.pekko.actor.typed.javadsl.StashBuffer; import org.apache.pekko.projection.cassandra.CassandraProjectionTest; import org.apache.pekko.projection.javadsl.ActorHandler; import org.apache.pekko.projection.javadsl.StatefulHandler; public class WordCountHandler extends StatefulHandler<Map<String, Integer>, WordEnvelope> { private final ProjectionId projectionId; private final WordCountRepository repository; public WordCountHandler(ProjectionId projectionId, WordCountRepository repository) { this.projectionId = projectionId; this.repository = repository; } @Override public CompletionStage<Map<String, Integer>> initialState() { return CompletableFuture.completedFuture(new HashMap<>()); } @Override public CompletionStage<Map<String, Integer>> process( Map<String, Integer> state, WordEnvelope envelope) { String word = envelope.word; CompletionStage<Integer> currentCount; if (state.containsKey(word)) currentCount = CompletableFuture.completedFuture(state.get(word)); else currentCount = repository.load(projectionId.id(), word); CompletionStage<Map<String, Integer>> newState = currentCount.thenCompose( n -> { return repository .save(projectionId.id(), word, n + 1) .thenApply( done -> { state.put(word, n + 1); return state; }); }); return newState; } }
Actor handler
A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.
Flow handler
An Apache Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.
Handler lifecycle
You can override the start
and stop
methods of the Handler
Handler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Schema
The database schema for the offset storage table.
The partition
field is used to distribute projection rows across cassandra nodes while also allowing us to query all rows for a projection name. For most offset types we return only one row that matches the provided projection key, but the MergeableOffset
MergeableOffset
requires all rows.
CREATE TABLE IF NOT EXISTS pekko_projection.offset_store (
projection_name text,
partition int,
projection_key text,
offset text,
manifest text,
last_updated timestamp,
PRIMARY KEY ((projection_name, partition), projection_key));
CREATE TABLE IF NOT EXISTS pekko_projection.projection_management (
projection_name text,
partition int,
projection_key text,
paused boolean,
last_updated timestamp,
PRIMARY KEY ((projection_name, partition), projection_key));
Offset types
The supported offset types of the CassandraProjection
are:
org.apache.pekko.persistence.query.Offset
types from events from Apache Pekko PersistenceString
Int
IntegerLong
- Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
The MergeableOffset
MergeableOffset
that is used for messages from Kafka is not implemented for the CassandraProjection
yet, see issue #97.
The schema can be created using the method CassandraProjection.createTablesIfNotExists
. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.
Configuration
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
sourcepekko.projection.cassandra {
session-config-path = "pekko.projection.cassandra.session-config"
session-config {
# The implementation of `org.apache.pekko.stream.connectors.cassandra.CqlSessionProvider`
# used for creating the `CqlSession`.
# It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
session-provider = "org.apache.pekko.stream.connectors.cassandra.DefaultSessionProvider"
# Configure Pekko Discovery by setting a service name
service-discovery {
name = ""
lookup-timeout = 1 s
}
# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "pekko.actor.default-dispatcher"
# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
# defined with this property.
# and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
datastax-java-driver-config = "datastax-java-driver"
}
offset-store {
keyspace = "pekko_projection"
# the database table name for the offset store
table = "offset_store"
# the database table name for the projection management data
management-table = "projection_management"
}
}
Cassandra driver configuration
All Cassandra driver settings are via its standard profile mechanism.
One important setting is to configure the database driver to retry the initial connection:
datastax-java-driver.advanced.reconnect-on-init = true
It is not enabled automatically as it is in the driver’s reference.conf and is not overridable in a profile.
It is possible to share the same Cassandra session as Apache Pekko Persistence Cassandra by setting the session-config-path
:
pekko.projection.cassandra {
session-config-path = "pekko.persistence.cassandra"
}
or share the same Cassandra session as Pekko Connectors Cassandra:
pekko.projection.cassandra {
session-config-path = "connectors.cassandra"
}
Cassandra driver overrides
source# See reference configuration at
# https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
# (check which exact version Pekko Projections uses)
datastax-java-driver {
# always set this to allow reconnection on startup if cassandra is down
# not overridable profile so this plugin can't override it for you
# advanced.reconnect-on-init = true
profiles {
pekko-projection-cassandra-profile {
basic.request {
consistency = QUORUM
# the offset store does not use any counters or collections
default-idempotence = true
}
}
}
}
Contact points configuration
The Cassandra server contact points can be defined with the Cassandra driver configuration
datastax-java-driver {
basic.contact-points = ["127.0.0.1:9042"]
basic.load-balancing-policy.local-datacenter = "datacenter1"
}
Alternatively, Pekko Discovery can be used for finding the Cassandra server contact points as described in the Pekkp Connectors Cassandra documentation.
Without any configuration it will use localhost:9042
as default.