Running a Projection
Once you have decided how you want to build your projection, the next step is to run it. Typically, you run it in a distributed fashion in order to spread the load over the different nodes in an Apache Pekko Cluster. However, it’s also possible to run it as a single instance (when not clustered) or as single instance in a Cluster Singleton.
Dependencies
To distribute the projection over the cluster we recommend the use of ShardedDaemonProcess. Add the following dependency in your project if not yet using Apache Pekko Cluster Sharding:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-cluster-sharding-typed" % "1.0.2"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-cluster-sharding-typed_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-cluster-sharding-typed_${versions.ScalaBinary}:1.0.2" }
Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.
For more information on using Apache Pekko Cluster consult Pekko’s reference documentation on Apache Pekko Cluster and Apache Pekko Cluster Sharding.
Running with Sharded Daemon Process
The Sharded Daemon Process can be used to distribute n
instances of a given Projection across the cluster. Therefore, it’s important that each Projection instance consumes a subset of the stream of envelopes.
How the subset is created depends on the kind of source we consume. If it’s an Apache Pekko Connectors Kafka source, this is done by Kafka consumer groups. When consuming from Apache Pekko Persistence Journal, the events must be sliced by tagging them as demonstrated in the example below.
Tagging Events in EventSourcedBehavior
Before we can consume the events, the EventSourcedBehavior
must tag the events with a slice number.
- Scala
-
source
import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.cluster.sharding.typed.scaladsl.ClusterSharding import pekko.cluster.sharding.typed.scaladsl.Entity import pekko.cluster.sharding.typed.scaladsl.EntityTypeKey val tags = Vector.tabulate(5)(i => s"carts-$i") val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart") def init(system: ActorSystem[_]): Unit = { ClusterSharding(system).init(Entity(EntityKey) { entityContext => val i = math.abs(entityContext.entityId.hashCode % tags.size) val selectedTag = tags(i) ShoppingCart(entityContext.entityId, selectedTag) }.withRole("write-model")) } def apply(cartId: String, projectionTag: String): Behavior[Command] = { EventSourcedBehavior .withEnforcedReplies[Command, Event, State]( PersistenceId(EntityKey.name, cartId), State.empty, (state, command) => // The shopping cart behavior changes if it's checked out or not. // The commands are handled differently for each case. if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command) else openShoppingCart(cartId, state, command), (state, event) => handleEvent(state, event)) .withTagger(_ => Set(projectionTag)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) }
- Java
-
source
public static final List<String> tags = Collections.unmodifiableList( Arrays.asList("carts-0", "carts-1", "carts-2", "carts-3", "carts-4")); @Override public Set<String> tagsFor(Event event) { int n = Math.abs(cartId.hashCode() % tags.size()); String selectedTag = tags.get(n); return Collections.singleton(selectedTag); }
In the above example, we created a Vector[String]
List<String>
of tags from carts-0 to carts-4. Each entity instance will tag its events using one of those tags. The tag is selected based on the modulo of the entity id’s hash code (stable identifier) and the total number of tags. As a matter of fact, this will create a journal sliced with different tags (ie: from carts-0 to carts-4). Note the .withTagger
in the initialization of the EventSourcedBehavior
.
The number of tags should be more than the number of nodes that you want to distribute the load over. It’s not easy to change this afterwards without system downtime. Fewer tags than the number of nodes will result in not hosting a Projection instance on some nodes. More tags than the number of nodes means that each node is hosting more than one Projection instance, which is fine. It’s good to start with more tags than nodes to have some room for scaling out to more nodes later if needed. As a rule of thumb, the number of tags should be a factor of ten greater than the planned maximum number of cluster nodes. It doesn’t have to be exact.
We will use those tags to query the journal and create as many Projections instances, and distribute them in the cluster.
When using Apache Pekko Persistence Cassandra plugin you should not use too many tags for each event. Each tag will result in a copy of the event in a separate table and that can impact write performance. Typically, you would use 1 tag per event as illustrated here. Additional filtering of events can be done in the Projection handler if it doesn’t have to act on certain events. The JDBC plugin doesn’t have this constraint.
See also the Apache Pekko reference documentation for tagging.
Event Sourced Provider per tag
We can use the EventSourcedProvider to consume the ShoppingCart
events.
- Scala
-
source
import org.apache.pekko import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal import pekko.projection.eventsourced.scaladsl.EventSourcedProvider import docs.eventsourced.ShoppingCart def sourceProvider(tag: String) = EventSourcedProvider .eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, tag = tag)
- Java
-
source
import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal; import org.apache.pekko.persistence.query.Offset; import org.apache.pekko.projection.javadsl.SourceProvider; import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider; import org.apache.pekko.projection.eventsourced.EventEnvelope; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) { return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); }
Note that we define a method that builds a new SourceProvider
for each passed tag
.
Building the Projection instances
Next we create a method to return Projection instances. Again, we pass a tag that is used to initialize the SourceProvider
and as the key in ProjectionId
.
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.cassandra.scaladsl.CassandraProjection def projection(tag: String) = CassandraProjection .atLeastOnce( projectionId = ProjectionId("shopping-carts", tag), sourceProvider(tag), handler = () => new ShoppingCartHandler) .withSaveOffset(100, 500.millis)
- Java
-
source
import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection; import org.apache.pekko.projection.Projection; import org.apache.pekko.projection.ProjectionId; int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) { return CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new) .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration); }
Initializing the Sharded Daemon
Once we have the tags, the SourceProvider
and the Projection
of our choice, we can glue all the pieces together using the Sharded Daemon Process and let it be distributed across the cluster.
- Scala
-
source
import org.apache.pekko import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import pekko.projection.ProjectionBehavior ShardedDaemonProcess(system).init[ProjectionBehavior.Command]( name = "shopping-carts", numberOfInstances = ShoppingCart.tags.size, behaviorFactory = (i: Int) => ProjectionBehavior(projection(ShoppingCart.tags(i))), stopMessage = ProjectionBehavior.Stop)
- Java
-
source
import org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import org.apache.pekko.projection.ProjectionBehavior; ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "shopping-carts", ShoppingCart.tags.size(), id -> ProjectionBehavior.create(projection(ShoppingCart.tags.get((Integer) id))), ProjectionBehavior.stopMessage());
For this example, we configure as many ShardedDaemonProcess
as tags and we define the behavior factory to return ProjectionBehavior
wrapping each time a different Projection
instance. Finally, the ShardedDaemon
is configured to use the ProjectionBehavior.Stop
as its control stop message.
For graceful stop it is recommended to use ProjectionBehavior.Stop
ProjectionBehavior.stop()
message.
Projection Behavior
The ProjectionBehavior
is an Actor Behavior
that knows how to manage the Projection lifecyle. The Projection starts to consume the events as soon as the actor is spawned and will restart the source in case of failures (see Projection Settings).
For graceful stop it is recommended to use ProjectionBehavior.Stop
ProjectionBehavior.stop()
message.
Running with local Actor
You can spawn the ProjectionBehavior
as any other Behavior
. This can be useful for testing or when running a local ActorSystem
without Apache Pekko Cluster.
- Scala
-
source
def sourceProvider(tag: String) = EventSourcedProvider .eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, tag = tag) def projection(tag: String) = CassandraProjection .atLeastOnce( projectionId = ProjectionId("shopping-carts", tag), sourceProvider(tag), handler = () => new ShoppingCartHandler) val projection1 = projection("carts-1") context.spawn(ProjectionBehavior(projection1), projection1.projectionId.id)
- Java
-
source
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) { return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); } Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) { return CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new); } Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1"); ActorRef<ProjectionBehavior.Command> projection1Ref = context.spawn(ProjectionBehavior.create(projection1), projection1.projectionId().id());
Be aware of that the projection and its offset storage is based on the ProjectionId
. If more than one instance with the same ProjectionId
are running concurrently they will overwrite each others offset storage with undefined and unpredictable results.
Running in Cluster Singleton
If you know that you only need one or a few projection instances an alternative to Sharded Daemon Process is to use Apache Pekko Cluster Singleton
- Scala
-
source
import org.apache.pekko import pekko.cluster.typed.ClusterSingleton import pekko.cluster.typed.SingletonActor def sourceProvider(tag: String) = EventSourcedProvider .eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, tag = tag) def projection(tag: String) = CassandraProjection .atLeastOnce( projectionId = ProjectionId("shopping-carts", tag), sourceProvider(tag), handler = () => new ShoppingCartHandler) val projection1 = projection("carts-1") ClusterSingleton(system).init( SingletonActor(ProjectionBehavior(projection1), projection1.projectionId.id) .withStopMessage(ProjectionBehavior.Stop))
- Java
-
source
import org.apache.pekko.cluster.typed.ClusterSingleton; import org.apache.pekko.cluster.typed.SingletonActor; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) { return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); } Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) { return CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new); } Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1"); ActorRef<ProjectionBehavior.Command> projection1Ref = ClusterSingleton.get(system) .init( SingletonActor.of( ProjectionBehavior.create(projection1), projection1.projectionId().id()) .withStopMessage(ProjectionBehavior.stopMessage()));
Be aware of that all projection instances that are running with Cluster Singleton will be running on the same node in the Cluster.