Running the Projection
This example requires a Cassandra database to run. If you do not have a Cassandra database then you can run one locally as a Docker container. To run a Cassandra database locally you can use docker-compose
to run the docker-compose.yaml
found in the Projections project root. The docker-compose.yml
file references the latest Cassandra Docker Image.
Change directory to the directory of the docker-compose.yml
file and manage a Cassandra container with the following commands.
Action | Docker Command |
Run | docker-compose --project-name getting-started up -d cassandra |
Stop | docker-compose --project-name getting-started stop |
Delete container state | docker-compose --project-name getting-started rm -f |
CQL shell (when running) | docker run -it --network getting-started_default --rm cassandra cqlsh cassandra |
To use a different Cassandra database update the Cassandra driver’s contact-points configuration found in ./examples/src/resources/guide-shopping-cart-app.conf
To run the Projection we must setup our Cassandra database to support the Cassandra Projection offset store as well as the new table we are projecting into with the ItemPopularityProjectionHandler
Create a Cassandra keyspace.
CREATE KEYSPACE IF NOT EXISTS pekko_projection WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
Create the Cassandra Projection offset store table. The DDL can be found in the Cassandra Projection, Schema section.
Create the ItemPopularityProjectionHandler
projection table with the DDL below
CREATE TABLE IF NOT EXISTS pekko_projection.item_popularity (
item_id text,
count counter,
PRIMARY KEY (item_id));
Source events are generated with the EventGeneratorApp
. This app is configured to use Apache Pekko Persistence Cassandra and Apache Pekko Cluster Sharding to persist random ShoppingCartApp.Events
to a journal. It will checkout a shopping cart with random items and quantities every 1 second. The app will automatically create all the Apache Pekko Persistence infrastructure tables in the pekko
keyspace. We won’t go into any further detail about how this app functions because it falls outside the scope of Apache Pekko Projections. To learn more about the writing events with Apache Pekko Persistence see the Apache Pekko documentation.
Add the Apache Pekko Cluster Sharding library to your project:
libraryDependencies += "org.apache.pekko" %% "pekko-cluster-sharding-typed" % "1.1.3"
def versions = [
ScalaBinary: "2.13"
dependencies {
implementation "org.apache.pekko:pekko-cluster-sharding-typed_${versions.ScalaBinary}:1.1.3"
Add the EventGeneratorApp to your project.
Run EventGeneratorApp
sbt "runMain"
mvn compile exec:java -Dexec.mainClass=""
If you don’t see any connection exceptions then you should eventually see log lines produced indicating that events are written to the journal.
[2020-08-13 15:20:05,583] [INFO] [$] [] [] - id [cb52b] tag [shopping-cart] event: ItemQuantityAdjusted(cb52b,skis,1,1) MDC: {persistencePhase=persist-evt, pekkoAddress=pekko://EventGenerator@, pekkoSource=pekko://EventGenerator/system/sharding/shopping-cart-event/678/cb52b, sourceActorSystem=EventGenerator, persistenceId=cb52b}
Finally, we can run ShoppingCartApp
in a new terminal:
sbt "runMain"
mvn compile exec:java -Dexec.mainClass=""
After a few seconds you should see the ItemPopularityProjectionHandler
logging that displays the current checkouts for the day:
[2020-08-12 12:16:34,216] [INFO] [] [] [] - ItemPopularityProjectionHandler(shopping-cart) item popularity for 'bowling shoes': [58]
Use the CQL shell to observe the full information in the item_popularity
cqlsh> SELECT item_id, count FROM pekko_projection.item_popularity;
item_id | count
pekko t-shirt | 37
cat t-shirt | 34
skis | 33
bowling shoes | 65
(4 rows)