Running the Projection in Apache Pekko Cluster
Running the Projection with Apache Pekko Cluster allows us to add two important aspects to our system: availability and scalability. A Projection running as a single Actor creates a single point of failure (availability), when the app shuts down for any reason, the projection is no longer running until it’s started again. A Projection running as a single Actor creates a processing bottleneck (scalability), all messages from the SourceProvider
SourceProvider
are processed by a single Actor on a single machine. By using a Sharded Daemon Process with Apache Pekko Cluster and Apache Pekko Cluster Sharding we can scale up the Projection and make it more available by running at least as many instances of the same Projection as we have cluster members. As Pekko cluster members join and leave the cluster the Sharded Daemon Process will automatically scale and rebalance Sharded Daemon Processes (Projection instances) accordingly.
Running the Projection as a Sharded Daemon Process requires no changes to our projection handler and repository, we only need to change the way in which the actor that runs the Projection is initialized. In the cluster version of this app we use a different configuration that configures Pekko cluster. The main difference in the app itself is that we use ShardedDaemonProcess
ShardedDaemonProcess
to initialize the Projection actor on our behalf. Instead of creating single instances of our repository and projection handler we create factory methods that encapsulate their instantiation along with the sharded daemon actors (1 per tag) assigned to this cluster member.
Add a new configuration file guide-shopping-cart-cluster-app.conf
to your src/main/resources/
directory. This configuration is largely the same as before, but includes extra configuration to enable cluster connectivity and sharding:
sourcedatastax-java-driver {
# basic.contact-points = ["127.0.0.1:9042"]
# basic.load-balancing-policy.local-datacenter = "datacenter1"
advanced {
# reconnect to c* if down when app is started
reconnect-on-init = true
}
}
pekko {
loglevel = DEBUG
actor {
provider = "cluster"
serialization-bindings {
"docs.guide.CborSerializable" = jackson-cbor
"jdocs.guide.CborSerializable" = jackson-cbor
}
}
# For the sample, just bind to loopback and do not allow access from the network
# the port is overridden by the logic in main class
remote.artery {
canonical.port = 0
canonical.hostname = 127.0.0.1
}
cluster {
seed-nodes = [
"pekko://ShoppingCartClusterApp@127.0.0.1:2551",
"pekko://ShoppingCartClusterApp@127.0.0.1:2552"
]
downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
}
persistence.journal {
plugin = "pekko.persistence.cassandra.journal"
auto-start-journals = ["pekko.persistence.cassandra.journal"]
}
persistence.snapshot-store.plugin = "pekko.persistence.cassandra.snapshot"
persistence {
cassandra {
journal {
# to create the schema
keyspace-autocreate = true
tables-autocreate = true
}
snapshot {
# to create the schema
keyspace-autocreate = true
tables-autocreate = true
}
query {
refresh-interval = 2s
}
events-by-tag {
# for lower latency
eventual-consistency-delay = 25ms
flush-interval = 25ms
pubsub-notification = on
}
}
}
}
Add the ShoppingCartClusterApp
to your project:
- Scala
-
source
package docs.guide import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.Behaviors import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal import pekko.persistence.query.Offset import pekko.projection.ProjectionBehavior import pekko.projection.ProjectionId import pekko.projection.cassandra.scaladsl.CassandraProjection import pekko.projection.eventsourced.EventEnvelope import pekko.projection.eventsourced.scaladsl.EventSourcedProvider import pekko.projection.scaladsl.SourceProvider import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry import com.typesafe.config.ConfigFactory object ShoppingCartClusterApp extends App { val port = args.headOption match { case Some(portString) if portString.matches("""\d+""") => portString.toInt case _ => throw new IllegalArgumentException("A pekko cluster port argument is required") } val config = ConfigFactory .parseString(s"pekko.remote.artery.canonical.port = $port") .withFallback(ConfigFactory.load("guide-shopping-cart-cluster-app.conf")) ActorSystem( Behaviors.setup[String] { context => val system = context.system implicit val ec = system.executionContext val session = CassandraSessionRegistry(system).sessionFor("pekko.projection.cassandra.session-config") val repo = new ItemPopularityProjectionRepositoryImpl(session) def sourceProvider(tag: String): SourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]] = EventSourcedProvider .eventsByTag[ShoppingCartEvents.Event]( system, readJournalPluginId = CassandraReadJournal.Identifier, tag = tag) def projection(tag: String) = CassandraProjection.atLeastOnce( projectionId = ProjectionId("shopping-carts", tag), sourceProvider(tag), handler = () => new ItemPopularityProjectionHandler(tag, system, repo)) ShardedDaemonProcess(system).init[ProjectionBehavior.Command]( name = "shopping-carts", numberOfInstances = ShoppingCartTags.Tags.size, behaviorFactory = (i: Int) => ProjectionBehavior(projection(ShoppingCartTags.Tags(i))), stopMessage = ProjectionBehavior.Stop) Behaviors.empty }, "ShoppingCartClusterApp", config) }
- Java
-
source
package jdocs.guide; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import org.apache.pekko.projection.ProjectionBehavior; import org.apache.pekko.projection.eventsourced.EventEnvelope; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal; import org.apache.pekko.persistence.query.Offset; import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider; import org.apache.pekko.projection.javadsl.SourceProvider; import org.apache.pekko.projection.ProjectionId; import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection; import org.apache.pekko.projection.javadsl.AtLeastOnceProjection; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry; public class ShoppingCartClusterApp { public static void main(String[] args) throws Exception { if (args.length == 0) { throw new IllegalArgumentException("A pekko cluster port argument is required"); } String portString = args[0]; int port = Integer.parseInt(portString); Config config = ConfigFactory.parseString("pekko.remote.artery.canonical.port = " + port) .withFallback(ConfigFactory.load("guide-shopping-cart-cluster-app.conf")); ActorSystem.create( Behaviors.setup( context -> { ActorSystem<Void> system = context.getSystem(); CassandraSession session = CassandraSessionRegistry.get(system) .sessionFor("pekko.projection.cassandra.session-config"); ItemPopularityProjectionRepositoryImpl repo = new ItemPopularityProjectionRepositoryImpl(session); ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "shopping-carts", ShoppingCartTags.TAGS.length, n -> ProjectionBehavior.create( projection(system, repo, ShoppingCartTags.TAGS[n])), ProjectionBehavior.stopMessage()); return Behaviors.empty(); }), "ShoppingCartClusterApp", config); } static SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider( ActorSystem<?> system, String tag) { return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); } static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection( ActorSystem<?> system, ItemPopularityProjectionRepository repo, String tag) { return CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", tag), sourceProvider(system, tag), () -> new ItemPopularityProjectionHandler(tag, system, repo)); } }
Before running the app we must first run the EventGeneratorApp
in cluster
mode in order to generate new shopping cart events for multiple tags, instead of just one. Shopping cart events are tagged in a similar way to the sharded entities themselves. Given a sequence of tags from 0..n
a hash is generated using the sharding entity key, the shopping cart id. The hash is modded %
by the number of tags in the sequence to choose a tag from the sequence. See the Tagging Events in EventSourcedBehavior section of the documentation for an example of how events can be tagged with Apache Pekko Persistence.
The same EventGeneratorApp
from the previous Running the Projection section can be used to generate events for this app with an additional argument cluster
. Run the app:
- sbt
-
sbt "runMain docs.guide.EventGeneratorApp cluster"
- Maven
-
mvn compile exec:java -Dexec.mainClass="jdocs.guide.EventGeneratorApp" -Dexec.args="cluster"
When the app is running you will observe that the logs show events written to different tags (carts-0
, carts-1
, etc.), instead of just one (shopping-cart
).
[2020-08-13 15:18:58,383] [INFO] [docs.guide.EventGeneratorApp$] [] [EventGenerator-org.apache.pekko.actor.default-dispatcher-19] - id [6059e] tag [carts-1] event: ItemQuantityAdjusted(6059e,cat t-shirt,1,2) MDC: {persistencePhase=persist-evt, pekkoAddress=pekko://EventGenerator@127.0.1.1:25520, pekkoSource=pekko://EventGenerator/system/sharding/shopping-cart-event/903/6059e, sourceActorSystem=EventGenerator, persistenceId=6059e}
Run the first member of your new Pekko cluster:
- sbt
-
sbt "runMain docs.guide.ShoppingCartClusterApp 2551
- Maven
-
mvn compile exec:java -Dexec.mainClass="jdocs.guide.ShoppingCartClusterApp" -Dexec.args="2551"
When the app is running you will observe that it will process all the shopping cart event tags, because it’s the only member of the cluster.
[2020-08-13 15:03:39,809] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-org.apache.pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-1) item popularity for 'pekko t-shirt': [1080] MDC: {}
[2020-08-13 15:03:39,810] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-org.apache.pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-2) item popularity for 'bowling shoes': [1241] MDC: {}
[2020-08-13 15:03:39,812] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-org.apache.pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-0) item popularity for 'pekko t-shirt': [1080] MDC: {}
...
Run a second member to expand the Pekko cluster member count to 2.
- sbt
-
sbt "runMain docs.guide.ShoppingCartClusterApp 2552
- Maven
-
mvn compile exec:java -Dexec.mainClass="jdocs.guide.ShoppingCartClusterApp" -Dexec.args="2552"
When the second app is running you will observe a sharding rebalance complete and then see a distinct set of tagged events processed on each cluster member.
A rebalance occurs and tag carts-0
is assigned to the new cluster member. Only tags carts-1
and carts-2
are processed by the first member.
[2020-08-13 15:03:59,019] [INFO] [org.apache.pekko.cluster.sharding.DDataShardCoordinator] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-41] - Starting rebalance for shards [0]. Current shards rebalancing: [] MDC: {pekkoAddress=pekko://ShoppingCa
rtClusterApp@127.0.0.1:2551, sourceThread=ShoppingCartClusterApp-pekko.actor.default-dispatcher-44, pekkoSource=pekko://ShoppingCartClusterApp@127.0.0.1:2551/system/sharding/sharded-daemon-process-shopping-cartsCoordinator/singleton/coordinator,
sourceActorSystem=ShoppingCartClusterApp, akkaTimestamp=19:03:59.019UTC}
[2020-08-13 15:04:35,261] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-1) item popularity for 'skis': [1244] MDC: {}
[2020-08-13 15:04:36,802] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-2) item popularity for 'skis': [1246] MDC: {}
[2020-08-13 15:04:36,805] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-2) item popularity for 'pekko t-shirt': [1136] MDC: {}
[2020-08-13 15:04:36,807] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-43] - ItemPopularityProjectionHandler(carts-2) item popularity for 'skis': [1249] MDC: {}
[2020-08-13 15:04:39,262] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-41] - ItemPopularityProjectionHandler(carts-1) item popularity for 'cat t-shirt': [1239] MDC: {}
...
When the second member joins the cluster it is assigned tag carts-0
and begins processing events beginning from the last successfully processed offset.
[2020-08-13 15:04:02,692] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-5] - ItemPopularityProjectionHandler(carts-0) item popularity for 'bowling shoes': [1275] MDC: {}
[2020-08-13 15:04:02,695] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-0) item popularity for 'pekko t-shirt': [1110] MDC: {}
[2020-08-13 15:04:02,699] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartClusterApp-pekko.actor.default-dispatcher-40] - ItemPopularityProjectionHandler(carts-0) item popularity for 'cat t-shirt': [1203] MDC: {}
...