Running a Projection

Once you have decided how you want to build your projection, the next step is to run it. Typically, you run it in a distributed fashion in order to spread the load over the different nodes in an Apache Pekko Cluster. However, it’s also possible to run it as a single instance (when not clustered) or as single instance in a Cluster Singleton.

Dependencies

To distribute the projection over the cluster we recommend the use of ShardedDaemonProcess. Add the following dependency in your project if not yet using Apache Pekko Cluster Sharding:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-cluster-sharding-typed" % "1.0.2"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-cluster-sharding-typed_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-cluster-sharding-typed_${versions.ScalaBinary}:1.0.2"
}

Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.

For more information on using Apache Pekko Cluster consult Pekko’s reference documentation on Apache Pekko Cluster and Apache Pekko Cluster Sharding.

Running with Sharded Daemon Process

The Sharded Daemon Process can be used to distribute n instances of a given Projection across the cluster. Therefore, it’s important that each Projection instance consumes a subset of the stream of envelopes.

How the subset is created depends on the kind of source we consume. If it’s an Apache Pekko Connectors Kafka source, this is done by Kafka consumer groups. When consuming from Apache Pekko Persistence Journal, the events must be sliced by tagging them as demonstrated in the example below.

Tagging Events in EventSourcedBehavior

Before we can consume the events, the EventSourcedBehavior must tag the events with a slice number.

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.cluster.sharding.typed.scaladsl.ClusterSharding
import pekko.cluster.sharding.typed.scaladsl.Entity
import pekko.cluster.sharding.typed.scaladsl.EntityTypeKey

val tags = Vector.tabulate(5)(i => s"carts-$i")
val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart")

def init(system: ActorSystem[_]): Unit = {
  ClusterSharding(system).init(Entity(EntityKey) { entityContext =>
    val i = math.abs(entityContext.entityId.hashCode % tags.size)
    val selectedTag = tags(i)
    ShoppingCart(entityContext.entityId, selectedTag)
  }.withRole("write-model"))
}

def apply(cartId: String, projectionTag: String): Behavior[Command] = {
  EventSourcedBehavior
    .withEnforcedReplies[Command, Event, State](
      PersistenceId(EntityKey.name, cartId),
      State.empty,
      (state, command) =>
        // The shopping cart behavior changes if it's checked out or not.
        // The commands are handled differently for each case.
        if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command)
        else openShoppingCart(cartId, state, command),
      (state, event) => handleEvent(state, event))
    .withTagger(_ => Set(projectionTag))
    .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
    .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
}
Java
sourcepublic static final List<String> tags =
    Collections.unmodifiableList(
        Arrays.asList("carts-0", "carts-1", "carts-2", "carts-3", "carts-4"));
@Override
public Set<String> tagsFor(Event event) {
  int n = Math.abs(cartId.hashCode() % tags.size());
  String selectedTag = tags.get(n);
  return Collections.singleton(selectedTag);
}

In the above example, we created a Vector[String]List<String> of tags from carts-0 to carts-4. Each entity instance will tag its events using one of those tags. The tag is selected based on the modulo of the entity id’s hash code (stable identifier) and the total number of tags. As a matter of fact, this will create a journal sliced with different tags (ie: from carts-0 to carts-4). Note the .withTagger in the initialization of the EventSourcedBehavior.

The number of tags should be more than the number of nodes that you want to distribute the load over. It’s not easy to change this afterwards without system downtime. Fewer tags than the number of nodes will result in not hosting a Projection instance on some nodes. More tags than the number of nodes means that each node is hosting more than one Projection instance, which is fine. It’s good to start with more tags than nodes to have some room for scaling out to more nodes later if needed. As a rule of thumb, the number of tags should be a factor of ten greater than the planned maximum number of cluster nodes. It doesn’t have to be exact.

We will use those tags to query the journal and create as many Projections instances, and distribute them in the cluster.

Warning

When using Apache Pekko Persistence Cassandra plugin you should not use too many tags for each event. Each tag will result in a copy of the event in a separate table and that can impact write performance. Typically, you would use 1 tag per event as illustrated here. Additional filtering of events can be done in the Projection handler if it doesn’t have to act on certain events. The JDBC plugin doesn’t have this constraint.

See also the Apache Pekko reference documentation for tagging.

Event Sourced Provider per tag

We can use the EventSourcedProvider to consume the ShoppingCart events.

Scala
sourceimport org.apache.pekko
import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import docs.eventsourced.ShoppingCart

def sourceProvider(tag: String) =
  EventSourcedProvider
    .eventsByTag[ShoppingCart.Event](
      system = system,
      readJournalPluginId = CassandraReadJournal.Identifier,
      tag = tag)
Java
sourceimport org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
import org.apache.pekko.projection.eventsourced.EventEnvelope;

SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) {
  return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag);
}

Note that we define a method that builds a new SourceProvider for each passed tag.

Building the Projection instances

Next we create a method to return Projection instances. Again, we pass a tag that is used to initialize the SourceProvider and as the key in ProjectionId.

Scala
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection

def projection(tag: String) =
  CassandraProjection
    .atLeastOnce(
      projectionId = ProjectionId("shopping-carts", tag),
      sourceProvider(tag),
      handler = () => new ShoppingCartHandler)
    .withSaveOffset(100, 500.millis)
Java
sourceimport org.apache.pekko.projection.cassandra.javadsl.CassandraProjection;
import org.apache.pekko.projection.Projection;
import org.apache.pekko.projection.ProjectionId;

int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);

Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) {
  return CassandraProjection.atLeastOnce(
          ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new)
      .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
}

Initializing the Sharded Daemon

Once we have the tags, the SourceProvider and the Projection of our choice, we can glue all the pieces together using the Sharded Daemon Process and let it be distributed across the cluster.

Scala
sourceimport org.apache.pekko
import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import pekko.projection.ProjectionBehavior

ShardedDaemonProcess(system).init[ProjectionBehavior.Command](
  name = "shopping-carts",
  numberOfInstances = ShoppingCart.tags.size,
  behaviorFactory = (i: Int) => ProjectionBehavior(projection(ShoppingCart.tags(i))),
  stopMessage = ProjectionBehavior.Stop)
Java
sourceimport org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
import org.apache.pekko.projection.ProjectionBehavior;

ShardedDaemonProcess.get(system)
    .init(
        ProjectionBehavior.Command.class,
        "shopping-carts",
        ShoppingCart.tags.size(),
        id -> ProjectionBehavior.create(projection(ShoppingCart.tags.get((Integer) id))),
        ProjectionBehavior.stopMessage());

For this example, we configure as many ShardedDaemonProcess as tags and we define the behavior factory to return ProjectionBehavior wrapping each time a different Projection instance. Finally, the ShardedDaemon is configured to use the ProjectionBehavior.Stop as its control stop message.

For graceful stop it is recommended to use ProjectionBehavior.StopProjectionBehavior.stop() message.

Projection Behavior

The ProjectionBehavior is an Actor Behavior that knows how to manage the Projection lifecyle. The Projection starts to consume the events as soon as the actor is spawned and will restart the source in case of failures (see Projection Settings).

For graceful stop it is recommended to use ProjectionBehavior.StopProjectionBehavior.stop() message.

Running with local Actor

You can spawn the ProjectionBehavior as any other Behavior. This can be useful for testing or when running a local ActorSystem without Apache Pekko Cluster.

Scala
sourcedef sourceProvider(tag: String) =
  EventSourcedProvider
    .eventsByTag[ShoppingCart.Event](
      system = system,
      readJournalPluginId = CassandraReadJournal.Identifier,
      tag = tag)

def projection(tag: String) =
  CassandraProjection
    .atLeastOnce(
      projectionId = ProjectionId("shopping-carts", tag),
      sourceProvider(tag),
      handler = () => new ShoppingCartHandler)

val projection1 = projection("carts-1")

context.spawn(ProjectionBehavior(projection1), projection1.projectionId.id)
Java
sourceSourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) {
  return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag);
}

Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) {
  return CassandraProjection.atLeastOnce(
      ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new);
}

Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1");

ActorRef<ProjectionBehavior.Command> projection1Ref =
    context.spawn(ProjectionBehavior.create(projection1), projection1.projectionId().id());

Be aware of that the projection and its offset storage is based on the ProjectionId. If more than one instance with the same ProjectionId are running concurrently they will overwrite each others offset storage with undefined and unpredictable results.

Running in Cluster Singleton

If you know that you only need one or a few projection instances an alternative to Sharded Daemon Process is to use Apache Pekko Cluster Singleton

Scala
sourceimport org.apache.pekko
import pekko.cluster.typed.ClusterSingleton
import pekko.cluster.typed.SingletonActor

def sourceProvider(tag: String) =
  EventSourcedProvider
    .eventsByTag[ShoppingCart.Event](
      system = system,
      readJournalPluginId = CassandraReadJournal.Identifier,
      tag = tag)

def projection(tag: String) =
  CassandraProjection
    .atLeastOnce(
      projectionId = ProjectionId("shopping-carts", tag),
      sourceProvider(tag),
      handler = () => new ShoppingCartHandler)

val projection1 = projection("carts-1")

ClusterSingleton(system).init(
  SingletonActor(ProjectionBehavior(projection1), projection1.projectionId.id)
    .withStopMessage(ProjectionBehavior.Stop))
Java
sourceimport org.apache.pekko.cluster.typed.ClusterSingleton;
import org.apache.pekko.cluster.typed.SingletonActor;

SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) {
  return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag);
}

Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) {
  return CassandraProjection.atLeastOnce(
      ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new);
}

Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1");

ActorRef<ProjectionBehavior.Command> projection1Ref =
    ClusterSingleton.get(system)
        .init(
            SingletonActor.of(
                ProjectionBehavior.create(projection1), projection1.projectionId().id())
                .withStopMessage(ProjectionBehavior.stopMessage()));

Be aware of that all projection instances that are running with Cluster Singleton will be running on the same node in the Cluster.