Build a Projection handler
While building a projection there are several non-functional requirements to consider. What technology to project into? What message delivery semantics are acceptable for the system? Is it compatible with the chosen Source Provider to enable exactly-once message delivery? Does runtime state need to be maintained in the projection while it’s running? It’s up to the user to choose the right answers to these questions, but you must research if the answers to these questions are compatible with each other.
In this guide we will create a Projection that represents shopping cart item popularity. We will persist our Projection to Cassandra with at-least-once semantics. The Projection itself will be represented as a Cassandra table.
To proceed we must add the Cassandra Projection library to our project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-cassandra" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-cassandra_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-cassandra_${versions.ScalaBinary}:1.0.0" }
It’s the user’s responsibility to implement the means to project into the target system, the Projection itself will only manage the persistence of the offset (though it is possible to enlist your projection into transactions when using projection implementations that support exactly-once like the JDBC). This guide encapsulates its data access layer in a Repository called the ItemPopularityProjectionRepository
. The repository will manage a Cassandra table called item_popularity
. Each row in item_popularity
contains a shopping cart item id and a count that represents how often that item was added or removed from all shopping carts.
The example will persist the item popularity count with a Cassandra counter data type. It’s not possible to guarantee that item count updates occur idempotently because we are using at-least-once semantics. However, since the count is only a rough metric to judge how popular an item is it’s not critical to have a totally accurate figure.
Add the ItemPopularityProjectionRepository
to your project:
- Scala
-
source
package docs.guide import scala.concurrent.ExecutionContext import scala.concurrent.Future import org.apache.pekko import pekko.Done import pekko.stream.connectors.cassandra.scaladsl.CassandraSession trait ItemPopularityProjectionRepository { def update(itemId: String, delta: Int): Future[Done] def getItem(itemId: String): Future[Option[Long]] } object ItemPopularityProjectionRepositoryImpl { val Keyspace = "pekko_projection" val PopularityTable = "item_popularity" } class ItemPopularityProjectionRepositoryImpl(session: CassandraSession)(implicit val ec: ExecutionContext) extends ItemPopularityProjectionRepository { import ItemPopularityProjectionRepositoryImpl._ override def update(itemId: String, delta: Int): Future[Done] = { session.executeWrite( s"UPDATE $Keyspace.$PopularityTable SET count = count + ? WHERE item_id = ?", java.lang.Long.valueOf(delta), itemId) } override def getItem(itemId: String): Future[Option[Long]] = { session .selectOne(s"SELECT item_id, count FROM $Keyspace.$PopularityTable WHERE item_id = ?", itemId) .map(opt => opt.map(row => row.getLong("count").longValue())) } }
- Java
-
source
package jdocs.guide; import org.apache.pekko.Done; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession; import java.util.Optional; import java.util.concurrent.CompletionStage; interface ItemPopularityProjectionRepository { CompletionStage<Done> update(String itemId, int delta); CompletionStage<Optional<Long>> getItem(String itemId); } class ItemPopularityProjectionRepositoryImpl implements ItemPopularityProjectionRepository { public static final String Keyspace = "pekko_projection"; public static final String PopularityTable = "item_popularity"; CassandraSession session; public ItemPopularityProjectionRepositoryImpl(CassandraSession session) { this.session = session; } @Override public CompletionStage<Done> update(String itemId, int delta) { return session.executeWrite( String.format( "UPDATE %s.%s SET count = count + ? WHERE item_id = ?", Keyspace, PopularityTable), (long) delta, itemId); } @Override public CompletionStage<Optional<Long>> getItem(String itemId) { return session .selectOne( String.format( "SELECT item_id, count FROM %s.%s WHERE item_id = ?", Keyspace, PopularityTable), itemId) .thenApply(opt -> opt.map(row -> row.getLong("count"))); } }
Now it’s time to write the Projection handler itself. This example uses a Handler
Handler
that will process ShoppingCartEvents.Event
events from the SourceProvider
SourceProvider
that we implemented earlier. Specifically, it will only process ItemEvents
that modify the items added or removed from a shopping cart. It will ignore all shopping cart Checkout
events by skipping them. The event envelopes are processed in the process
method.
This example will also log the popularity count of every 10th item event that is processed. The logging counter is stored as a mutable variable within the handler. Since this is a simple log operation managing the state in this manner is fine, but to handle more advanced stateful operations you should evaluate using the StatefulHandler
StatefulHandler
.
- Scala
-
source
package docs.guide import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Success import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.LoggerOps import pekko.projection.eventsourced.EventEnvelope import pekko.projection.scaladsl.Handler import org.slf4j.LoggerFactory object ItemPopularityProjectionHandler { val LogInterval = 10 } class ItemPopularityProjectionHandler(tag: String, system: ActorSystem[_], repo: ItemPopularityProjectionRepository) extends Handler[EventEnvelope[ShoppingCartEvents.Event]]() { import ShoppingCartEvents._ private var logCounter: Int = 0 private val log = LoggerFactory.getLogger(getClass) private implicit val ec: ExecutionContext = system.executionContext /** * The Envelope handler to process events. */ override def process(envelope: EventEnvelope[Event]): Future[Done] = { val processed = envelope.event match { case ItemAdded(_, itemId, quantity) => repo.update(itemId, quantity) case ItemQuantityAdjusted(_, itemId, newQuantity, oldQuantity) => repo.update(itemId, newQuantity - oldQuantity) case ItemRemoved(_, itemId, oldQuantity) => repo.update(itemId, 0 - oldQuantity) case _: CheckedOut => Future.successful(Done) // skip } processed.onComplete { case Success(_) => logItemCount(envelope.event) case _ => () } processed } /** * Log the popularity of the item in every `ItemEvent` every `LogInterval`. */ private def logItemCount(event: Event): Unit = event match { case itemEvent: ItemEvent => logCounter += 1 if (logCounter == ItemPopularityProjectionHandler.LogInterval) { logCounter = 0 val itemId = itemEvent.itemId repo.getItem(itemId).foreach { case Some(count) => log.infoN("ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", tag, itemId, count) case None => log.info2("ItemPopularityProjectionHandler({}) item popularity for '{}': [0]", tag, itemId) } } case _ => () } }
- Java
-
source
package jdocs.guide; import org.apache.pekko.Done; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.projection.eventsourced.EventEnvelope; import org.apache.pekko.projection.javadsl.Handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class ItemPopularityProjectionHandler extends Handler<EventEnvelope<ShoppingCartEvents.Event>> { public final int LogInterval = 10; private Logger log = LoggerFactory.getLogger(this.getClass()); private int logCounter = 0; private String tag; private ActorSystem<?> system; private ItemPopularityProjectionRepository repo; public ItemPopularityProjectionHandler( String tag, ActorSystem<?> system, ItemPopularityProjectionRepository repo) { this.tag = tag; this.system = system; this.repo = repo; } /** The Envelope handler to process events. */ @Override public CompletionStage<Done> process(EventEnvelope<ShoppingCartEvents.Event> envelope) throws Exception { ShoppingCartEvents.Event event = envelope.event(); CompletionStage<Done> dbEffect = null; if (event instanceof ShoppingCartEvents.ItemAdded) { ShoppingCartEvents.ItemAdded added = (ShoppingCartEvents.ItemAdded) event; dbEffect = this.repo.update(added.itemId, added.quantity); } else if (event instanceof ShoppingCartEvents.ItemQuantityAdjusted) { ShoppingCartEvents.ItemQuantityAdjusted adjusted = (ShoppingCartEvents.ItemQuantityAdjusted) event; dbEffect = this.repo.update(adjusted.itemId, adjusted.newQuantity - adjusted.oldQuantity); } else if (event instanceof ShoppingCartEvents.ItemRemoved) { ShoppingCartEvents.ItemRemoved removed = (ShoppingCartEvents.ItemRemoved) event; dbEffect = this.repo.update(removed.itemId, 0 - removed.oldQuantity); } else { // skip all other events, such as `CheckedOut` dbEffect = CompletableFuture.completedFuture(Done.getInstance()); } dbEffect.thenAccept(done -> logItemCount(event)); return dbEffect; } /** Log the popularity of the item in every `ItemEvent` every `LogInterval`. */ private void logItemCount(ShoppingCartEvents.Event event) { if (event instanceof ShoppingCartEvents.ItemEvent) { ShoppingCartEvents.ItemEvent itemEvent = (ShoppingCartEvents.ItemEvent) event; logCounter += 1; if (logCounter == LogInterval) { logCounter = 0; String itemId = itemEvent.getItemId(); repo.getItem(itemId) .thenAccept( opt -> { long count = opt.orElse(0L); this.log.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", this.tag, itemId, count); }); } } } }
The projection is run by wrapping it in a ProjectionBehavior
ProjectionBehavior
and spawning it as an Actor in the ActorSystem
ActorSystem
.
Add the following imports to ShoppingCartApp
:
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.cassandra.scaladsl.CassandraProjection import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry
- Java
-
source
import org.apache.pekko.projection.ProjectionId; import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection; import org.apache.pekko.projection.javadsl.AtLeastOnceProjection; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry;
Setup the Projection in the Guardian Behavior
defined in ShoppingCartApp
:
- Scala
-
source
implicit val ec = system.executionContext val session = CassandraSessionRegistry(system).sessionFor("pekko.projection.cassandra.session-config") val repo = new ItemPopularityProjectionRepositoryImpl(session) val projection = CassandraProjection.atLeastOnce( projectionId = ProjectionId("shopping-carts", ShoppingCartTags.Single), sourceProvider, handler = () => new ItemPopularityProjectionHandler(ShoppingCartTags.Single, system, repo)) context.spawn(ProjectionBehavior(projection), projection.projectionId.id)
- Java
-
source
CassandraSession session = CassandraSessionRegistry.get(system) .sessionFor("pekko.projection.cassandra.session-config"); ItemPopularityProjectionRepositoryImpl repo = new ItemPopularityProjectionRepositoryImpl(session); AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection = CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", ShoppingCartTags.SINGLE), sourceProvider, () -> new ItemPopularityProjectionHandler( ShoppingCartTags.SINGLE, system, repo)); context.spawn(ProjectionBehavior.create(projection), projection.projectionId().id());