Running the Projection in Apache Pekko Cluster

Running the Projection with Apache Pekko Cluster allows us to add two important aspects to our system: availability and scalability. A Projection running as a single Actor creates a single point of failure (availability), when the app shuts down for any reason, the projection is no longer running until it’s started again. A Projection running as a single Actor creates a processing bottleneck (scalability), all messages from the SourceProviderSourceProvider are processed by a single Actor on a single machine. By using a Sharded Daemon Process with Apache Pekko Cluster and Apache Pekko Cluster Sharding we can scale up the Projection and make it more available by running at least as many instances of the same Projection as we have cluster members. As Pekko cluster members join and leave the cluster the Sharded Daemon Process will automatically scale and rebalance Sharded Daemon Processes (Projection instances) accordingly.

Running the Projection as a Sharded Daemon Process requires no changes to our projection handler and repository, we only need to change the way in which the actor that runs the Projection is initialized. In the cluster version of this app we use a different configuration that configures Pekko cluster. The main difference in the app itself is that we use ShardedDaemonProcessShardedDaemonProcess to initialize the Projection actor on our behalf. Instead of creating single instances of our repository and projection handler we create factory methods that encapsulate their instantiation along with the sharded daemon actors (1 per tag) assigned to this cluster member.

Add a new configuration file guide-shopping-cart-cluster-app.conf to your src/main/resources/ directory. This configuration is largely the same as before, but includes extra configuration to enable cluster connectivity and sharding:

sourcedatastax-java-driver {
  # basic.contact-points = ["127.0.0.1:9042"]
  # basic.load-balancing-policy.local-datacenter = "datacenter1"
  advanced {
    # reconnect to c* if down when app is started
    reconnect-on-init = true
  }
}

pekko {
  loglevel = DEBUG
  actor {
    provider = "cluster"
    serialization-bindings {
      "docs.guide.CborSerializable" = jackson-cbor
      "jdocs.guide.CborSerializable" = jackson-cbor
    }
  }

  # For the sample, just bind to loopback and do not allow access from the network
  # the port is overridden by the logic in main class
  remote.artery {
    canonical.port = 0
    canonical.hostname = 127.0.0.1
  }

  cluster {
    seed-nodes = [
      "pekko://ShoppingCartClusterApp@127.0.0.1:2551",
      "pekko://ShoppingCartClusterApp@127.0.0.1:2552"
    ]

    downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
  }

  persistence.journal {
    plugin = "pekko.persistence.cassandra.journal"
    auto-start-journals = ["pekko.persistence.cassandra.journal"]
  }

  persistence.snapshot-store.plugin = "pekko.persistence.cassandra.snapshot"

  persistence {
    cassandra {
      journal {
        # to create the schema
        keyspace-autocreate = true
        tables-autocreate = true
      }

      snapshot {
        # to create the schema
        keyspace-autocreate = true
        tables-autocreate = true
      }

      query {
        refresh-interval = 2s
      }

      events-by-tag {
        # for lower latency
        eventual-consistency-delay = 25ms
        flush-interval = 25ms
        pubsub-notification = on
      }
    }
  }
}

Add the ShoppingCartClusterApp to your project:

Scala
sourcepackage docs.guide

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
import pekko.persistence.query.Offset
import pekko.projection.ProjectionBehavior
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection
import pekko.projection.eventsourced.EventEnvelope
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import pekko.projection.scaladsl.SourceProvider
import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry
import com.typesafe.config.ConfigFactory

object ShoppingCartClusterApp extends App {
  val port = args.headOption match {
    case Some(portString) if portString.matches("""\d+""") => portString.toInt
    case _                                                 => throw new IllegalArgumentException("A pekko cluster port argument is required")
  }

  val config = ConfigFactory
    .parseString(s"pekko.remote.artery.canonical.port = $port")
    .withFallback(ConfigFactory.load("guide-shopping-cart-cluster-app.conf"))

  ActorSystem(
    Behaviors.setup[String] { context =>
      val system = context.system
      implicit val ec = system.executionContext
      val session = CassandraSessionRegistry(system).sessionFor("pekko.projection.cassandra.session-config")
      val repo = new ItemPopularityProjectionRepositoryImpl(session)

      def sourceProvider(tag: String): SourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]] =
        EventSourcedProvider
          .eventsByTag[ShoppingCartEvents.Event](
            system,
            readJournalPluginId = CassandraReadJournal.Identifier,
            tag = tag)

      def projection(tag: String) =
        CassandraProjection.atLeastOnce(
          projectionId = ProjectionId("shopping-carts", tag),
          sourceProvider(tag),
          handler = () => new ItemPopularityProjectionHandler(tag, system, repo))

      ShardedDaemonProcess(system).init[ProjectionBehavior.Command](
        name = "shopping-carts",
        numberOfInstances = ShoppingCartTags.Tags.size,
        behaviorFactory = (i: Int) => ProjectionBehavior(projection(ShoppingCartTags.Tags(i))),
        stopMessage = ProjectionBehavior.Stop)

      Behaviors.empty
    },
    "ShoppingCartClusterApp",
    config)
}
Java
sourcepackage jdocs.guide;

import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
import org.apache.pekko.projection.ProjectionBehavior;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection;
import org.apache.pekko.projection.javadsl.AtLeastOnceProjection;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry;

public class ShoppingCartClusterApp {
  public static void main(String[] args) throws Exception {
    if (args.length == 0) {
      throw new IllegalArgumentException("A pekko cluster port argument is required");
    }

    String portString = args[0];
    int port = Integer.parseInt(portString);

    Config config =
        ConfigFactory.parseString("pekko.remote.artery.canonical.port = " + port)
            .withFallback(ConfigFactory.load("guide-shopping-cart-cluster-app.conf"));

    ActorSystem.create(
        Behaviors.setup(
            context -> {
              ActorSystem<Void> system = context.getSystem();

              CassandraSession session =
                  CassandraSessionRegistry.get(system)
                      .sessionFor("pekko.projection.cassandra.session-config");
              ItemPopularityProjectionRepositoryImpl repo =
                  new ItemPopularityProjectionRepositoryImpl(session);

              ShardedDaemonProcess.get(system)
                  .init(
                      ProjectionBehavior.Command.class,
                      "shopping-carts",
                      ShoppingCartTags.TAGS.length,
                      n ->
                          ProjectionBehavior.create(
                              projection(system, repo, ShoppingCartTags.TAGS[n])),
                      ProjectionBehavior.stopMessage());

              return Behaviors.empty();
            }),
        "ShoppingCartClusterApp",
        config);
  }

  static SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider(
      ActorSystem<?> system, String tag) {
    return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag);
  }

  static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection(
      ActorSystem<?> system, ItemPopularityProjectionRepository repo, String tag) {
    return CassandraProjection.atLeastOnce(
        ProjectionId.of("shopping-carts", tag),
        sourceProvider(system, tag),
        () -> new ItemPopularityProjectionHandler(tag, system, repo));
  }
}

Before running the app we must first run the EventGeneratorApp in cluster mode in order to generate new shopping cart events for multiple tags, instead of just one. Shopping cart events are tagged in a similar way to the sharded entities themselves. Given a sequence of tags from 0..n a hash is generated using the sharding entity key, the shopping cart id. The hash is modded % by the number of tags in the sequence to choose a tag from the sequence. See the Tagging Events in EventSourcedBehavior section of the documentation for an example of how events can be tagged with Apache Pekko Persistence.

The same EventGeneratorApp from the previous Running the Projection section can be used to generate events for this app with an additional argument cluster. Run the app:

sbt
sbt "runMain docs.guide.EventGeneratorApp cluster"
Maven
mvn compile exec:java -Dexec.mainClass="jdocs.guide.EventGeneratorApp" -Dexec.args="cluster"

When the app is running you will observe that the logs show events written to different tags (carts-0, carts-1, etc.), instead of just one (shopping-cart).

[2020-08-13 15:18:58,383] [INFO] [docs.guide.EventGeneratorApp$] [] [EventGenerator-org.apache.pekko.actor.default-dispatcher-19] - id [6059e] tag [carts-1] event: ItemQuantityAdjusted(6059e,cat t-shirt,1,2) MDC: {persistencePhase=persist-evt, pekkoAddress=pekko://EventGenerator@127.0.1.1:25520, pekkoSource=pekko://EventGenerator/system/sharding/shopping-cart-event/903/6059e, sourceActorSystem=EventGenerator, persistenceId=6059e}

Run the first member of your new Pekko cluster:

sbt
sbt "runMain docs.guide.ShoppingCartClusterApp 2551
Maven
mvn compile exec:java -Dexec.mainClass="jdocs.guide.ShoppingCartClusterApp" -Dexec.args="2551"

When the app is running you will observe that it will process all the shopping cart event tags, because it’s the only member of the cluster.

[2020-08-13 15:03:39,809] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-org.apache.pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-1) item popularity for 'pekko t-shirt': [1080] MDC: {}   
[2020-08-13 15:03:39,810] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-org.apache.pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-2) item popularity for 'bowling shoes': [1241] MDC: {}  
[2020-08-13 15:03:39,812] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-org.apache.pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-0) item popularity for 'pekko t-shirt': [1080] MDC: {}
...

Run a second member to expand the Pekko cluster member count to 2.

sbt
sbt "runMain docs.guide.ShoppingCartClusterApp 2552
Maven
mvn compile exec:java -Dexec.mainClass="jdocs.guide.ShoppingCartClusterApp" -Dexec.args="2552"

When the second app is running you will observe a sharding rebalance complete and then see a distinct set of tagged events processed on each cluster member.

A rebalance occurs and tag carts-0 is assigned to the new cluster member. Only tags carts-1 and carts-2 are processed by the first member.

[2020-08-13 15:03:59,019] [INFO] [org.apache.pekko.cluster.sharding.DDataShardCoordinator] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-41] - Starting rebalance for shards [0]. Current shards rebalancing: [] MDC: {pekkoAddress=pekko://ShoppingCa
rtClusterApp@127.0.0.1:2551, sourceThread=ShoppingCartClusterApp-pekko.actor.default-dispatcher-44, pekkoSource=pekko://ShoppingCartClusterApp@127.0.0.1:2551/system/sharding/sharded-daemon-process-shopping-cartsCoordinator/singleton/coordinator, 
sourceActorSystem=ShoppingCartClusterApp, akkaTimestamp=19:03:59.019UTC}                                                                                                                                                                           
[2020-08-13 15:04:35,261] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-1) item popularity for 'skis': [1244] MDC: {}           
[2020-08-13 15:04:36,802] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-2) item popularity for 'skis': [1246] MDC: {}           
[2020-08-13 15:04:36,805] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-2) item popularity for 'pekko t-shirt': [1136] MDC: {}   
[2020-08-13 15:04:36,807] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-2) item popularity for 'skis': [1249] MDC: {}           
[2020-08-13 15:04:39,262] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-41] - ItemPopularityProjectionHandler(carts-1) item popularity for 'cat t-shirt': [1239] MDC: {}                  
...

When the second member joins the cluster it is assigned tag carts-0 and begins processing events beginning from the last successfully processed offset.

[2020-08-13 15:04:02,692] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-5] - ItemPopularityProjectionHandler(carts-0) item popularity for 'bowling shoes': [1275] MDC: {}   
[2020-08-13 15:04:02,695] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-0) item popularity for 'pekko t-shirt': [1110] MDC: {}   
[2020-08-13 15:04:02,699] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-0) item popularity for 'cat t-shirt': [1203] MDC: {}
...