Offset in Cassandra

The CassandraProjectionCassandraProjection has support for storing the offset in Cassandra.

The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider with supported offset types.

The envelope handler can integrate with anything, such as publishing to a message broker, or updating a read model in Cassandra.

The CassandraProjection offers at-least-once and at-most-once processing semantics, but not exactly-once.

Dependencies

To use the Cassandra module of Apache Pekko Projections add the following dependency in your project:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-cassandra" % "1.0.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-cassandra_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-cassandra_${versions.ScalaBinary}:1.0.0"
}

Apache Pekko Projections requires Pekko 1.0.2 or later, see Pekko version.

Project Info: Apache Pekko Projections Cassandra
Artifact
org.apache.pekko
pekko-projection-cassandra
1.0.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions3.3.3, 2.13.13, 2.12.19
JPMS module namepekko.projection.cassandra
License
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/apache/pekko-projection

Transitive dependencies

The table below shows pekko-projection-cassandra’s direct dependencies and the second tab shows all libraries it depends on transitively.

at-least-once

The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.

Scala
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection

val projection =
  CassandraProjection
    .atLeastOnce(
      projectionId = ProjectionId("shopping-carts", "carts-1"),
      sourceProvider,
      handler = () => new ShoppingCartHandler)
    .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
Java
sourceimport org.apache.pekko.projection.cassandra.javadsl.CassandraProjection;
import org.apache.pekko.projection.Projection;
import org.apache.pekko.projection.ProjectionId;

int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    CassandraProjection.atLeastOnce(
            ProjectionId.of("shopping-carts", "carts-1"),
            sourceProvider,
            () -> new ShoppingCartHandler())
        .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);

The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset of the returned AtLeastOnceProjection. The default settings for the window is defined in configuration section pekko.projection.at-least-once. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.

The ShoppingCartHandler is shown below.

at-most-once

The offset for each envelope is stored before the envelope has been processed and giving at-most-once processing semantics. This means that if the projection is restarted from previously stored offset one envelope may not have been processed.

Scala
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection

val projection =
  CassandraProjection.atMostOnce(
    projectionId = ProjectionId("shopping-carts", "carts-1"),
    sourceProvider,
    handler = () => new ShoppingCartHandler)
Java
sourceProjection<EventEnvelope<ShoppingCart.Event>> projection =
    CassandraProjection.atMostOnce(
        ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, ShoppingCartHandler::new);

Since the offset must be stored for each envelope this is slower than at-least-once, which can batch offsets before storing.

The ShoppingCartHandler is shown below.

groupedWithin

The envelopes can be grouped before processing, which can be useful for batch updates.

Scala
sourceval projection =
  CassandraProjection
    .groupedWithin(
      projectionId = ProjectionId("shopping-carts", "carts-1"),
      sourceProvider,
      handler = () => new GroupedShoppingCartHandler)
    .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
Java
sourceint groupAfterEnvelopes = 20;
Duration groupAfterDuration = Duration.ofMillis(500);

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    CassandraProjection.groupedWithin(
            ProjectionId.of("shopping-carts", "carts-1"),
            sourceProvider,
            GroupedShoppingCartHandler::new)
        .withGroup(groupAfterEnvelopes, groupAfterDuration);

The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup of the returned GroupedProjection. The default settings for the window is defined in configuration section pekko.projection.grouped.

When using groupedWithin the handler is a Handler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]Handler<List<EventEnvelope<ShoppingCart.Event>>>. The GroupedShoppingCartHandler is shown below.

It stores the offset in Cassandra immediately after the handler has processed the envelopes, but that is still with at-least-once processing semantics. This means that if the projection is restarted from previously stored offset the previous group of envelopes may be processed more than once.

Handler

It’s in the HandlerHandler that you implement the processing of each envelope. It’s essentially a function from Envelope to Future[Done]CompletionStage<Done>. This means that the envelope handler can integrate with anything, such as publishing to a message broker, or updating a read model in Cassandra.

A handler that is consuming ShoppingCart.Event from eventsByTag can look like this:

Scala
sourceimport scala.concurrent.duration._
import scala.concurrent.Future

import org.apache.pekko
import pekko.Done
import pekko.projection.scaladsl.Handler
import org.slf4j.LoggerFactory

class ShoppingCartHandler extends Handler[EventEnvelope[ShoppingCart.Event]] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
    envelope.event match {
      case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info2("Shopping cart {} was checked out at {}", cartId, time)
        Future.successful(Done)

      case otherEvent =>
        logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent)
        Future.successful(Done)
    }
  }
}
Java
sourceimport org.apache.pekko.projection.javadsl.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class ShoppingCartHandler extends Handler<EventEnvelope<ShoppingCart.Event>> {
  private Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) {
    ShoppingCart.Event event = envelope.event();
    if (event instanceof ShoppingCart.CheckedOut) {
      ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
      logger.info(
          "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);
      return CompletableFuture.completedFuture(Done.getInstance());
    } else {
      logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
      return CompletableFuture.completedFuture(Done.getInstance());
    }
  }
}
Hint

Such simple handlers can also be defined as plain functionslambdas via the helper Handler.applyHandler.fromFunction factory method.

Grouped handler

When using CassandraProjection.groupedWithin the handler is processing a SeqList of envelopes.

Scala
sourceimport scala.collection.immutable

class GroupedShoppingCartHandler extends Handler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Future[Done] = {
    envelopes.map(_.event).foreach {
      case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info2("Shopping cart {} was checked out at {}", cartId, time)

      case otherEvent =>
        logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent)
    }
    Future.successful(Done)
  }
}
Java
sourcepublic class GroupedShoppingCartHandler extends Handler<List<EventEnvelope<ShoppingCart.Event>>> {
  private Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public CompletionStage<Done> process(List<EventEnvelope<ShoppingCart.Event>> envelopes) {
    envelopes.forEach(
        env -> {
          ShoppingCart.Event event = env.event();
          if (event instanceof ShoppingCart.CheckedOut) {
            ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
            logger.info(
                "Shopping cart {} was checked out at {}",
                checkedOut.cartId,
                checkedOut.eventTime);
          } else {
            logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
          }
        });
    return CompletableFuture.completedFuture(Done.getInstance());
  }
}

Stateful handler

The Handler can be stateful, with variables and mutable data structures. It is invoked by the Projection machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state.

The returned Future[Done]CompletionStage<Done> is to be completed when the processing of the envelope has finished. The handler will not be invoked with the next envelope until after the returned Future[Done]CompletionStage<Done> has been completed.

Scala
sourceclass WordCountHandler extends Handler[WordEnvelope] {
  private val logger = LoggerFactory.getLogger(getClass)
  private var state: Map[Word, Count] = Map.empty

  override def process(envelope: WordEnvelope): Future[Done] = {
    val word = envelope.word
    val newCount = state.getOrElse(word, 0) + 1
    logger.info("Word count for {} is {}", word, newCount)
    state = state.updated(word, newCount)
    Future.successful(Done)
  }
}
Java
sourcepublic class WordCountHandler extends Handler<WordEnvelope> {
  private final Logger logger = LoggerFactory.getLogger(getClass());
  private final Map<String, Integer> state = new HashMap<>();

  @Override
  public CompletionStage<Done> process(WordEnvelope envelope) {
    String word = envelope.word;
    int newCount = state.getOrDefault(word, 0) + 1;
    logger.info("Word count for {} is {}", word, newCount);
    state.put(word, newCount);
    return CompletableFuture.completedFuture(Done.getInstance());
  }
}
Note

It is important that the Handler instance is not shared between several Projection instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection instance should use a new Handler instance. This is the reason why the handler parameter is a factory (() =>)(Supplier) of the handler. A new handler instance is also created when the projection is restarted.

However, the state must typically be loaded and updated by asynchronous operations and then it can be error prone to manage the state in variables of the Handler. For that purpose a StatefulHandlerStatefulHandler is provided.

Let us look at how a StatefulHandler can be implemented in the context of a “word count” domain. The purpose is to process a stream of words and for each word keep track of how many times it has occurred.

Given an envelope and SourceProvider for this example:

Scala
sourcetype Word = String
type Count = Int

final case class WordEnvelope(offset: Long, word: Word)

class WordSource(implicit ec: ExecutionContext) extends SourceProvider[Long, WordEnvelope] {

  private val src = Source(
    List(WordEnvelope(1L, "abc"), WordEnvelope(2L, "def"), WordEnvelope(3L, "ghi"), WordEnvelope(4L, "abc")))

  override def source(offset: () => Future[Option[Long]]): Future[Source[WordEnvelope, NotUsed]] = {
    offset().map {
      case Some(o) => src.dropWhile(_.offset <= o)
      case _       => src
    }
  }

  override def extractOffset(env: WordEnvelope): Long = env.offset

  override def extractCreationTime(env: WordEnvelope): Long = 0L
}
Java
sourcepublic class WordEnvelope {
  public final Long offset;
  public final String word;

  public WordEnvelope(Long offset, String word) {
    this.offset = offset;
    this.word = word;
  }
}

class WordSource extends SourceProvider<Long, WordEnvelope> {

  private final Source<WordEnvelope, NotUsed> src =
      Source.from(
          Arrays.asList(
              new WordEnvelope(1L, "abc"),
              new WordEnvelope(2L, "def"),
              new WordEnvelope(3L, "ghi"),
              new WordEnvelope(4L, "abc")));

  @Override
  public CompletionStage<Source<WordEnvelope, NotUsed>> source(
      Supplier<CompletionStage<Optional<Long>>> offset) {
    return offset
        .get()
        .thenApply(
            o -> {
              if (o.isPresent()) return src.dropWhile(envelope -> envelope.offset <= o.get());
              else return src;
            });
  }

  @Override
  public Long extractOffset(WordEnvelope envelope) {
    return envelope.offset;
  }

  @Override
  public long extractCreationTime(WordEnvelope envelope) {
    return 0L;
  }
}

and a repository for the interaction with the database:

Scala
sourcetrait WordCountRepository {
  def load(id: String, word: Word): Future[Count]
  def loadAll(id: String): Future[Map[Word, Count]]
  def save(id: String, word: Word, count: Count): Future[Done]
}
Java
sourcepublic interface WordCountRepository {
  CompletionStage<Integer> load(String id, String word);

  CompletionStage<Map<String, Integer>> loadAll(String id);

  CompletionStage<Done> save(String id, String word, int count);
}

The Projection can be definined as:

Scala
sourceval projection =
  CassandraProjection
    .atLeastOnce[Long, WordEnvelope](
      projectionId,
      sourceProvider = new WordSource,
      handler = () => new WordCountHandler(projectionId, repository))
Java
sourceProjection<WordEnvelope> projection =
    CassandraProjection.atLeastOnce(
        projectionId, new WordSource(), () -> new WordCountHandler(projectionId, repository));

The handler can be implemented as follows.

A naive approach would be to have one row per word for storing the current count in the database. The handler could be implemented as a completely stateless handler that for each processed envelope loads the current count from the database, increment the count by 1 and saved it again. Typically there will be several instances of the Projection with different ProjectionId.id. Each Projection instance would be responsible for processing a subset of all words. This stateless approach wouldn’t be very efficient and you would have to use optimistic database locking to make sure that one Projection instance is not overwriting the stored value from another instance without reading the right value first.

Better would be that each Projection instance is a single-writer so that it can keep the current word count in memory and only load it on startup or on demand.

A handler that is loading the state from the database when it’s starting up:

Scala
sourceimport org.apache.pekko.projection.scaladsl.StatefulHandler

class WordCountHandler(projectionId: ProjectionId, repository: WordCountRepository)(implicit ec: ExecutionContext)
    extends StatefulHandler[Map[Word, Count], WordEnvelope] {

  override def initialState(): Future[Map[Word, Count]] = repository.loadAll(projectionId.id)

  override def process(state: Map[Word, Count], envelope: WordEnvelope): Future[Map[Word, Count]] = {
    val word = envelope.word
    val newCount = state.getOrElse(word, 0) + 1
    val newState = for {
      _ <- repository.save(projectionId.id, word, newCount)
    } yield state.updated(word, newCount)

    newState
  }
}
Java
sourceimport org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.SupervisorStrategy;
import org.apache.pekko.actor.typed.javadsl.AskPattern;
import org.apache.pekko.actor.typed.javadsl.StashBuffer;
import org.apache.pekko.projection.cassandra.CassandraProjectionTest;
import org.apache.pekko.projection.javadsl.ActorHandler;
import org.apache.pekko.projection.javadsl.StatefulHandler;

public class WordCountHandler extends StatefulHandler<Map<String, Integer>, WordEnvelope> {
  private final ProjectionId projectionId;
  private final WordCountRepository repository;

  public WordCountHandler(ProjectionId projectionId, WordCountRepository repository) {
    this.projectionId = projectionId;
    this.repository = repository;
  }

  @Override
  public CompletionStage<Map<String, Integer>> initialState() {
    return repository.loadAll(projectionId.id());
  }

  @Override
  public CompletionStage<Map<String, Integer>> process(
      Map<String, Integer> state, WordEnvelope envelope) {
    String word = envelope.word;
    int newCount = state.getOrDefault(word, 0) + 1;
    CompletionStage<Map<String, Integer>> newState =
        repository
            .save(projectionId.id(), word, newCount)
            .thenApply(
                done -> {
                  state.put(word, newCount);
                  return state;
                });

    return newState;
  }
}

The StatefulHandler has two methods that needs to be implemented.

  • initialState - Invoked to load the initial state when the projection is started or if previous process failed.
  • process(state, envelope) - Invoked for each Envelope, one at a time. The state parameter is the completed value of the previously returned Future[State]CompletionStage<State> or the initialState.

If the previously returned Future[State]CompletionStage<State> failed it will call initialState again and use that value.

Another implementation would be a handler that is loading the current count for a word on demand, and thereafter caches it in the in-memory state:

Scala
sourceimport org.apache.pekko.projection.scaladsl.StatefulHandler

class WordCountHandler(projectionId: ProjectionId, repository: WordCountRepository)(implicit ec: ExecutionContext)
    extends StatefulHandler[Map[Word, Count], WordEnvelope] {

  override def initialState(): Future[Map[Word, Count]] =
    Future.successful(Map.empty)

  override def process(state: Map[Word, Count], envelope: WordEnvelope): Future[Map[Word, Count]] = {
    val word = envelope.word

    val currentCount =
      state.get(word) match {
        case None =>
          repository.load(projectionId.id, word)
        case Some(count) =>
          Future.successful(count)
      }

    val newState = for {
      c <- currentCount
      newCount = c + 1
      _ <- repository.save(projectionId.id, word, newCount)
    } yield state.updated(word, newCount)

    newState
  }

}
Java
sourceimport org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.SupervisorStrategy;
import org.apache.pekko.actor.typed.javadsl.AskPattern;
import org.apache.pekko.actor.typed.javadsl.StashBuffer;
import org.apache.pekko.projection.cassandra.CassandraProjectionTest;
import org.apache.pekko.projection.javadsl.ActorHandler;
import org.apache.pekko.projection.javadsl.StatefulHandler;

public class WordCountHandler extends StatefulHandler<Map<String, Integer>, WordEnvelope> {
  private final ProjectionId projectionId;
  private final WordCountRepository repository;

  public WordCountHandler(ProjectionId projectionId, WordCountRepository repository) {
    this.projectionId = projectionId;
    this.repository = repository;
  }

  @Override
  public CompletionStage<Map<String, Integer>> initialState() {
    return CompletableFuture.completedFuture(new HashMap<>());
  }

  @Override
  public CompletionStage<Map<String, Integer>> process(
      Map<String, Integer> state, WordEnvelope envelope) {
    String word = envelope.word;

    CompletionStage<Integer> currentCount;
    if (state.containsKey(word))
      currentCount = CompletableFuture.completedFuture(state.get(word));
    else currentCount = repository.load(projectionId.id(), word);

    CompletionStage<Map<String, Integer>> newState =
        currentCount.thenCompose(
            n -> {
              return repository
                  .save(projectionId.id(), word, n + 1)
                  .thenApply(
                      done -> {
                        state.put(word, n + 1);
                        return state;
                      });
            });

    return newState;
  }
}

Actor handler

A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.

Flow handler

An Apache Pekko Streams FlowWithContext can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.

Handler lifecycle

You can override the start and stop methods of the HandlerHandler to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection is restarted after failure.

See also error handling.

Schema

The database schema for the offset storage table.

Note

The partition field is used to distribute projection rows across cassandra nodes while also allowing us to query all rows for a projection name. For most offset types we return only one row that matches the provided projection key, but the MergeableOffsetMergeableOffset requires all rows.

CREATE TABLE IF NOT EXISTS pekko_projection.offset_store (
  projection_name text,
  partition int,
  projection_key text,
  offset text,
  manifest text,
  last_updated timestamp,
  PRIMARY KEY ((projection_name, partition), projection_key));

CREATE TABLE IF NOT EXISTS pekko_projection.projection_management (
  projection_name text,
  partition int,
  projection_key text,
  paused boolean,
  last_updated timestamp,
  PRIMARY KEY ((projection_name, partition), projection_key));

Offset types

The supported offset types of the CassandraProjection are:

  • org.apache.pekko.persistence.query.Offset types from events from Apache Pekko Persistence
  • String
  • IntInteger
  • Long
  • Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
Note

The MergeableOffsetMergeableOffset that is used for messages from Kafka is not implemented for the CassandraProjection yet, see issue #97.

The schema can be created using the method CassandraProjection.createTablesIfNotExists. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.

Configuration

Make your edits/overrides in your application.conf.

The reference configuration file with the default values:

sourcepekko.projection.cassandra {

  session-config-path = "pekko.projection.cassandra.session-config"

  session-config {
    # The implementation of `org.apache.pekko.stream.connectors.cassandra.CqlSessionProvider`
    # used for creating the `CqlSession`.
    # It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
    session-provider = "org.apache.pekko.stream.connectors.cassandra.DefaultSessionProvider"

    # Configure Pekko Discovery by setting a service name
    service-discovery {
      name = ""
      lookup-timeout = 1 s
    }

    # The ExecutionContext to use for the session tasks and future composition.
    session-dispatcher = "pekko.actor.default-dispatcher"

    # Full config path to the Datastax Java driver's configuration section.
    # When connecting to more than one Cassandra cluster different session configuration can be
    # defined with this property.
    # and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
    datastax-java-driver-config = "datastax-java-driver"
  }

  offset-store {
    keyspace = "pekko_projection"
    # the database table name for the offset store
    table = "offset_store"
    # the database table name for the projection management data
    management-table = "projection_management"
  }
}

Cassandra driver configuration

All Cassandra driver settings are via its standard profile mechanism.

One important setting is to configure the database driver to retry the initial connection:

datastax-java-driver.advanced.reconnect-on-init = true

It is not enabled automatically as it is in the driver’s reference.conf and is not overridable in a profile.

It is possible to share the same Cassandra session as Apache Pekko Persistence Cassandra by setting the session-config-path:

pekko.projection.cassandra {
  session-config-path = "pekko.persistence.cassandra"
}

or share the same Cassandra session as Pekko Connectors Cassandra:

pekko.projection.cassandra {
  session-config-path = "connectors.cassandra"
}

Cassandra driver overrides

source# See reference configuration at
# https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
# (check which exact version Pekko Projections uses)
datastax-java-driver {

  # always set this to allow reconnection on startup if cassandra is down
  # not overridable profile so this plugin can't override it for you
  # advanced.reconnect-on-init = true

  profiles {
    pekko-projection-cassandra-profile {
      basic.request {
        consistency = QUORUM
        # the offset store does not use any counters or collections
        default-idempotence = true
      }
    }
  }
}

Contact points configuration

The Cassandra server contact points can be defined with the Cassandra driver configuration

datastax-java-driver {
  basic.contact-points = ["127.0.0.1:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}

Alternatively, Pekko Discovery can be used for finding the Cassandra server contact points as described in the Pekkp Connectors Cassandra documentation.

Without any configuration it will use localhost:9042 as default.