Build a Projection handler

While building a projection there are several non-functional requirements to consider. What technology to project into? What message delivery semantics are acceptable for the system? Is it compatible with the chosen Source Provider to enable exactly-once message delivery? Does runtime state need to be maintained in the projection while it’s running? It’s up to the user to choose the right answers to these questions, but you must research if the answers to these questions are compatible with each other.

In this guide we will create a Projection that represents shopping cart item popularity. We will persist our Projection to Cassandra with at-least-once semantics. The Projection itself will be represented as a Cassandra table.

To proceed we must add the Cassandra Projection library to our project:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-cassandra" % "1.0.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-cassandra_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-cassandra_${versions.ScalaBinary}:1.0.0"
}

It’s the user’s responsibility to implement the means to project into the target system, the Projection itself will only manage the persistence of the offset (though it is possible to enlist your projection into transactions when using projection implementations that support exactly-once like the JDBC). This guide encapsulates its data access layer in a Repository called the ItemPopularityProjectionRepository. The repository will manage a Cassandra table called item_popularity. Each row in item_popularity contains a shopping cart item id and a count that represents how often that item was added or removed from all shopping carts.

Note

The example will persist the item popularity count with a Cassandra counter data type. It’s not possible to guarantee that item count updates occur idempotently because we are using at-least-once semantics. However, since the count is only a rough metric to judge how popular an item is it’s not critical to have a totally accurate figure.

Add the ItemPopularityProjectionRepository to your project:

Scala
sourcepackage docs.guide

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import org.apache.pekko
import pekko.Done
import pekko.stream.connectors.cassandra.scaladsl.CassandraSession

trait ItemPopularityProjectionRepository {

  def update(itemId: String, delta: Int): Future[Done]
  def getItem(itemId: String): Future[Option[Long]]
}

object ItemPopularityProjectionRepositoryImpl {
  val Keyspace = "pekko_projection"
  val PopularityTable = "item_popularity"
}

class ItemPopularityProjectionRepositoryImpl(session: CassandraSession)(implicit val ec: ExecutionContext)
    extends ItemPopularityProjectionRepository {
  import ItemPopularityProjectionRepositoryImpl._

  override def update(itemId: String, delta: Int): Future[Done] = {
    session.executeWrite(
      s"UPDATE $Keyspace.$PopularityTable SET count = count + ? WHERE item_id = ?",
      java.lang.Long.valueOf(delta),
      itemId)
  }

  override def getItem(itemId: String): Future[Option[Long]] = {
    session
      .selectOne(s"SELECT item_id, count FROM $Keyspace.$PopularityTable WHERE item_id = ?", itemId)
      .map(opt => opt.map(row => row.getLong("count").longValue()))
  }
}
Java
sourcepackage jdocs.guide;

import org.apache.pekko.Done;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

interface ItemPopularityProjectionRepository {
  CompletionStage<Done> update(String itemId, int delta);

  CompletionStage<Optional<Long>> getItem(String itemId);
}

class ItemPopularityProjectionRepositoryImpl implements ItemPopularityProjectionRepository {

  public static final String Keyspace = "pekko_projection";
  public static final String PopularityTable = "item_popularity";

  CassandraSession session;

  public ItemPopularityProjectionRepositoryImpl(CassandraSession session) {
    this.session = session;
  }

  @Override
  public CompletionStage<Done> update(String itemId, int delta) {
    return session.executeWrite(
        String.format(
            "UPDATE %s.%s SET count = count + ? WHERE item_id = ?", Keyspace, PopularityTable),
        (long) delta,
        itemId);
  }

  @Override
  public CompletionStage<Optional<Long>> getItem(String itemId) {
    return session
        .selectOne(
            String.format(
                "SELECT item_id, count FROM %s.%s WHERE item_id = ?", Keyspace, PopularityTable),
            itemId)
        .thenApply(opt -> opt.map(row -> row.getLong("count")));
  }
}

Now it’s time to write the Projection handler itself. This example uses a HandlerHandler that will process ShoppingCartEvents.Event events from the SourceProviderSourceProvider that we implemented earlier. Specifically, it will only process ItemEvents that modify the items added or removed from a shopping cart. It will ignore all shopping cart Checkout events by skipping them. The event envelopes are processed in the process method.

This example will also log the popularity count of every 10th item event that is processed. The logging counter is stored as a mutable variable within the handler. Since this is a simple log operation managing the state in this manner is fine, but to handle more advanced stateful operations you should evaluate using the StatefulHandlerStatefulHandler.

Scala
sourcepackage docs.guide

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Success

import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.LoggerOps
import pekko.projection.eventsourced.EventEnvelope
import pekko.projection.scaladsl.Handler
import org.slf4j.LoggerFactory

object ItemPopularityProjectionHandler {
  val LogInterval = 10
}

class ItemPopularityProjectionHandler(tag: String, system: ActorSystem[_], repo: ItemPopularityProjectionRepository)
    extends Handler[EventEnvelope[ShoppingCartEvents.Event]]() {
  import ShoppingCartEvents._

  private var logCounter: Int = 0
  private val log = LoggerFactory.getLogger(getClass)
  private implicit val ec: ExecutionContext = system.executionContext

  /**
   * The Envelope handler to process events.
   */
  override def process(envelope: EventEnvelope[Event]): Future[Done] = {
    val processed = envelope.event match {
      case ItemAdded(_, itemId, quantity)                            => repo.update(itemId, quantity)
      case ItemQuantityAdjusted(_, itemId, newQuantity, oldQuantity) => repo.update(itemId, newQuantity - oldQuantity)
      case ItemRemoved(_, itemId, oldQuantity)                       => repo.update(itemId, 0 - oldQuantity)
      case _: CheckedOut                                             => Future.successful(Done) // skip
    }
    processed.onComplete {
      case Success(_) => logItemCount(envelope.event)
      case _          => ()
    }
    processed
  }

  /**
   * Log the popularity of the item in every `ItemEvent` every `LogInterval`.
   */
  private def logItemCount(event: Event): Unit = event match {
    case itemEvent: ItemEvent =>
      logCounter += 1
      if (logCounter == ItemPopularityProjectionHandler.LogInterval) {
        logCounter = 0
        val itemId = itemEvent.itemId
        repo.getItem(itemId).foreach {
          case Some(count) =>
            log.infoN("ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", tag, itemId, count)
          case None =>
            log.info2("ItemPopularityProjectionHandler({}) item popularity for '{}': [0]", tag, itemId)
        }
      }
    case _ => ()
  }

}
Java
sourcepackage jdocs.guide;

import org.apache.pekko.Done;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
import org.apache.pekko.projection.javadsl.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class ItemPopularityProjectionHandler
    extends Handler<EventEnvelope<ShoppingCartEvents.Event>> {
  public final int LogInterval = 10;

  private Logger log = LoggerFactory.getLogger(this.getClass());

  private int logCounter = 0;
  private String tag;
  private ActorSystem<?> system;
  private ItemPopularityProjectionRepository repo;

  public ItemPopularityProjectionHandler(
      String tag, ActorSystem<?> system, ItemPopularityProjectionRepository repo) {
    this.tag = tag;
    this.system = system;
    this.repo = repo;
  }

  /** The Envelope handler to process events. */
  @Override
  public CompletionStage<Done> process(EventEnvelope<ShoppingCartEvents.Event> envelope)
      throws Exception {
    ShoppingCartEvents.Event event = envelope.event();

    CompletionStage<Done> dbEffect = null;
    if (event instanceof ShoppingCartEvents.ItemAdded) {
      ShoppingCartEvents.ItemAdded added = (ShoppingCartEvents.ItemAdded) event;
      dbEffect = this.repo.update(added.itemId, added.quantity);
    } else if (event instanceof ShoppingCartEvents.ItemQuantityAdjusted) {
      ShoppingCartEvents.ItemQuantityAdjusted adjusted =
          (ShoppingCartEvents.ItemQuantityAdjusted) event;
      dbEffect = this.repo.update(adjusted.itemId, adjusted.newQuantity - adjusted.oldQuantity);
    } else if (event instanceof ShoppingCartEvents.ItemRemoved) {
      ShoppingCartEvents.ItemRemoved removed = (ShoppingCartEvents.ItemRemoved) event;
      dbEffect = this.repo.update(removed.itemId, 0 - removed.oldQuantity);
    } else {
      // skip all other events, such as `CheckedOut`
      dbEffect = CompletableFuture.completedFuture(Done.getInstance());
    }

    dbEffect.thenAccept(done -> logItemCount(event));

    return dbEffect;
  }

  /** Log the popularity of the item in every `ItemEvent` every `LogInterval`. */
  private void logItemCount(ShoppingCartEvents.Event event) {
    if (event instanceof ShoppingCartEvents.ItemEvent) {
      ShoppingCartEvents.ItemEvent itemEvent = (ShoppingCartEvents.ItemEvent) event;
      logCounter += 1;
      if (logCounter == LogInterval) {
        logCounter = 0;
        String itemId = itemEvent.getItemId();
        repo.getItem(itemId)
            .thenAccept(
                opt -> {
                  long count = opt.orElse(0L);
                  this.log.info(
                      "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]",
                      this.tag,
                      itemId,
                      count);
                });
      }
    }
  }
}

The projection is run by wrapping it in a ProjectionBehaviorProjectionBehavior and spawning it as an Actor in the ActorSystemActorSystem.

Add the following imports to ShoppingCartApp:

Scala
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection
import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry
Java
sourceimport org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection;
import org.apache.pekko.projection.javadsl.AtLeastOnceProjection;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry;

Setup the Projection in the Guardian Behavior defined in ShoppingCartApp:

Scala
sourceimplicit val ec = system.executionContext
val session = CassandraSessionRegistry(system).sessionFor("pekko.projection.cassandra.session-config")
val repo = new ItemPopularityProjectionRepositoryImpl(session)
val projection = CassandraProjection.atLeastOnce(
  projectionId = ProjectionId("shopping-carts", ShoppingCartTags.Single),
  sourceProvider,
  handler = () => new ItemPopularityProjectionHandler(ShoppingCartTags.Single, system, repo))

context.spawn(ProjectionBehavior(projection), projection.projectionId.id)
Java
sourceCassandraSession session =
    CassandraSessionRegistry.get(system)
        .sessionFor("pekko.projection.cassandra.session-config");
ItemPopularityProjectionRepositoryImpl repo =
    new ItemPopularityProjectionRepositoryImpl(session);
AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
    CassandraProjection.atLeastOnce(
        ProjectionId.of("shopping-carts", ShoppingCartTags.SINGLE),
        sourceProvider,
        () ->
            new ItemPopularityProjectionHandler(
                ShoppingCartTags.SINGLE, system, repo));

context.spawn(ProjectionBehavior.create(projection), projection.projectionId().id());