Build a Projection handler
While building a projection there are several non-functional requirements to consider. What technology to project into? What message delivery semantics are acceptable for the system? Is it compatible with the chosen Source Provider to enable exactly-once message delivery? Does runtime state need to be maintained in the projection while it’s running? It’s up to the user to choose the right answers to these questions, but you must research if the answers to these questions are compatible with each other.
In this guide we will create a Projection that represents shopping cart item popularity. We will persist our Projection to Cassandra with at-least-once semantics. The Projection itself will be represented as a Cassandra table.
To proceed we must add the Cassandra Projection library to our project:
libraryDependencies += "org.apache.pekko" %% "pekko-projection-cassandra" % "1.1.0"
def versions = [
ScalaBinary: "2.13"
dependencies {
implementation "org.apache.pekko:pekko-projection-cassandra_${versions.ScalaBinary}:1.1.0"
It’s the user’s responsibility to implement the means to project into the target system, the Projection itself will only manage the persistence of the offset (though it is possible to enlist your projection into transactions when using projection implementations that support exactly-once like the JDBC). This guide encapsulates its data access layer in a Repository called the ItemPopularityProjectionRepository
. The repository will manage a Cassandra table called item_popularity
. Each row in item_popularity
contains a shopping cart item id and a count that represents how often that item was added or removed from all shopping carts.
The example will persist the item popularity count with a Cassandra counter data type. It’s not possible to guarantee that item count updates occur idempotently because we are using at-least-once semantics. However, since the count is only a rough metric to judge how popular an item is it’s not critical to have a totally accurate figure.
Add the ItemPopularityProjectionRepository
to your project:
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import org.apache.pekko
import pekko.Done
trait ItemPopularityProjectionRepository {
def update(itemId: String, delta: Int): Future[Done]
def getItem(itemId: String): Future[Option[Long]]
object ItemPopularityProjectionRepositoryImpl {
val Keyspace = "pekko_projection"
val PopularityTable = "item_popularity"
class ItemPopularityProjectionRepositoryImpl(session: CassandraSession)(implicit val ec: ExecutionContext)
extends ItemPopularityProjectionRepository {
import ItemPopularityProjectionRepositoryImpl._
override def update(itemId: String, delta: Int): Future[Done] = {
s"UPDATE $Keyspace.$PopularityTable SET count = count + ? WHERE item_id = ?",
override def getItem(itemId: String): Future[Option[Long]] = {
.selectOne(s"SELECT item_id, count FROM $Keyspace.$PopularityTable WHERE item_id = ?", itemId)
.map(opt => => row.getLong("count").longValue()))
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.apache.pekko.Done;
interface ItemPopularityProjectionRepository {
CompletionStage<Done> update(String itemId, int delta);
CompletionStage<Optional<Long>> getItem(String itemId);
class ItemPopularityProjectionRepositoryImpl implements ItemPopularityProjectionRepository {
public static final String Keyspace = "pekko_projection";
public static final String PopularityTable = "item_popularity";
CassandraSession session;
public ItemPopularityProjectionRepositoryImpl(CassandraSession session) {
this.session = session;
public CompletionStage<Done> update(String itemId, int delta) {
return session.executeWrite(
"UPDATE %s.%s SET count = count + ? WHERE item_id = ?", Keyspace, PopularityTable),
(long) delta,
public CompletionStage<Optional<Long>> getItem(String itemId) {
return session
"SELECT item_id, count FROM %s.%s WHERE item_id = ?", Keyspace, PopularityTable),
.thenApply(opt -> -> row.getLong("count")));
Now it’s time to write the Projection handler itself. This example uses a Handler
that will process ShoppingCartEvents.Event
events from the SourceProvider
that we implemented earlier. Specifically, it will only process ItemEvents
that modify the items added or removed from a shopping cart. It will ignore all shopping cart Checkout
events by skipping them. The event envelopes are processed in the process
This example will also log the popularity count of every 10th item event that is processed. The logging counter is stored as a mutable variable within the handler. Since this is a simple log operation managing the state in this manner is fine, but to handle more advanced stateful operations you should evaluate using the StatefulHandler
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Success
import org.apache.pekko
import pekko.Done
import pekko.projection.eventsourced.EventEnvelope
import pekko.projection.scaladsl.Handler
import org.slf4j.LoggerFactory
object ItemPopularityProjectionHandler {
val LogInterval = 10
class ItemPopularityProjectionHandler(tag: String, system: ActorSystem[_], repo: ItemPopularityProjectionRepository)
extends Handler[EventEnvelope[ShoppingCartEvents.Event]]() {
import ShoppingCartEvents._
private var logCounter: Int = 0
private val log = LoggerFactory.getLogger(getClass)
private implicit val ec: ExecutionContext = system.executionContext
* The Envelope handler to process events.
override def process(envelope: EventEnvelope[Event]): Future[Done] = {
val processed = envelope.event match {
case ItemAdded(_, itemId, quantity) => repo.update(itemId, quantity)
case ItemQuantityAdjusted(_, itemId, newQuantity, oldQuantity) => repo.update(itemId, newQuantity - oldQuantity)
case ItemRemoved(_, itemId, oldQuantity) => repo.update(itemId, 0 - oldQuantity)
case _: CheckedOut => Future.successful(Done) // skip
processed.onComplete {
case Success(_) => logItemCount(envelope.event)
case _ => ()
* Log the popularity of the item in every `ItemEvent` every `LogInterval`.
private def logItemCount(event: Event): Unit = event match {
case itemEvent: ItemEvent =>
logCounter += 1
if (logCounter == ItemPopularityProjectionHandler.LogInterval) {
logCounter = 0
val itemId = itemEvent.itemId
repo.getItem(itemId).foreach {
case Some(count) =>
log.infoN("ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", tag, itemId, count)
case None =>
log.info2("ItemPopularityProjectionHandler({}) item popularity for '{}': [0]", tag, itemId)
case _ => ()
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.pekko.Done;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
import org.apache.pekko.projection.javadsl.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ItemPopularityProjectionHandler
extends Handler<EventEnvelope<ShoppingCartEvents.Event>> {
public final int LogInterval = 10;
private Logger log = LoggerFactory.getLogger(this.getClass());
private int logCounter = 0;
private String tag;
private ActorSystem<?> system;
private ItemPopularityProjectionRepository repo;
public ItemPopularityProjectionHandler(
String tag, ActorSystem<?> system, ItemPopularityProjectionRepository repo) {
this.tag = tag;
this.system = system;
this.repo = repo;
/** The Envelope handler to process events. */
public CompletionStage<Done> process(EventEnvelope<ShoppingCartEvents.Event> envelope)
throws Exception {
ShoppingCartEvents.Event event = envelope.event();
CompletionStage<Done> dbEffect = null;
if (event instanceof ShoppingCartEvents.ItemAdded) {
ShoppingCartEvents.ItemAdded added = (ShoppingCartEvents.ItemAdded) event;
dbEffect = this.repo.update(added.itemId, added.quantity);
} else if (event instanceof ShoppingCartEvents.ItemQuantityAdjusted) {
ShoppingCartEvents.ItemQuantityAdjusted adjusted =
(ShoppingCartEvents.ItemQuantityAdjusted) event;
dbEffect = this.repo.update(adjusted.itemId, adjusted.newQuantity - adjusted.oldQuantity);
} else if (event instanceof ShoppingCartEvents.ItemRemoved) {
ShoppingCartEvents.ItemRemoved removed = (ShoppingCartEvents.ItemRemoved) event;
dbEffect = this.repo.update(removed.itemId, 0 - removed.oldQuantity);
} else {
// skip all other events, such as `CheckedOut`
dbEffect = CompletableFuture.completedFuture(Done.getInstance());
dbEffect.thenAccept(done -> logItemCount(event));
return dbEffect;
/** Log the popularity of the item in every `ItemEvent` every `LogInterval`. */
private void logItemCount(ShoppingCartEvents.Event event) {
if (event instanceof ShoppingCartEvents.ItemEvent) {
ShoppingCartEvents.ItemEvent itemEvent = (ShoppingCartEvents.ItemEvent) event;
logCounter += 1;
if (logCounter == LogInterval) {
logCounter = 0;
String itemId = itemEvent.getItemId();
opt -> {
long count = opt.orElse(0L);
"ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]",
The projection is run by wrapping it in a ProjectionBehavior
and spawning it as an Actor in the ActorSystem
Add the following imports to ShoppingCartApp
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection
sourceimport org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection;
import org.apache.pekko.projection.javadsl.AtLeastOnceProjection;
Setup the Projection in the Guardian Behavior
defined in ShoppingCartApp
sourceimplicit val ec = system.executionContext
val session = CassandraSessionRegistry(system).sessionFor("pekko.projection.cassandra.session-config")
val repo = new ItemPopularityProjectionRepositoryImpl(session)
val projection = CassandraProjection.atLeastOnce(
projectionId = ProjectionId("shopping-carts", ShoppingCartTags.Single),
handler = () => new ItemPopularityProjectionHandler(ShoppingCartTags.Single, system, repo))
sourceCassandraSession session =
ItemPopularityProjectionRepositoryImpl repo =
new ItemPopularityProjectionRepositoryImpl(session);
AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
ProjectionId.of("shopping-carts", ShoppingCartTags.SINGLE),
() ->
new ItemPopularityProjectionHandler(
ShoppingCartTags.SINGLE, system, repo));
context.spawn(ProjectionBehavior.create(projection), projection.projectionId().id());