This tutorial contains 3 samples illustrating different Pekko cluster features.
- Subscribe to cluster membership events
- Sending messages to actors running on nodes in the cluster
- Cluster aware routers
To enable cluster capabilities in your Apache Pekko project you should, at a minimum, add the remote settings, and use
cluster as the
pekko.cluster.seed-nodes should normally also be added to your
The seed nodes are configured contact points which newly started nodes will try to connect with in order to join the cluster.
Note that if you are going to start the nodes on different machines you need to specify the ip-addresses or host names of the machines in
application.conf instead of
The small program together with its configuration starts an ActorSystem with the Cluster enabled. It joins the cluster and starts an actor that logs some membership events. Take a look at the SimpleClusterListener.scala actor.
You can read more about the cluster concepts in the documentation.
To run this sample, type
sbt "runMain sample.cluster.simple.App".
sample.cluster.simple.App starts three actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and then open three terminal windows.
In the first terminal window, start the first seed node with the following command:
sbt "runMain sample.cluster.simple.App 17356"
17356 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to ‘Up’.
In the second terminal window, start the second seed node with the following command:
sbt "runMain sample.cluster.simple.App 17357"
17357 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to ‘Up’.
Switch over to the first terminal window and see in the log output that the member joined.
Start another node in the third terminal window with the following command:
sbt "runMain sample.cluster.simple.App 0"
Now you don’t need to specify the port number, 0 means that it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows.
Start even more nodes in the same way, if you like.
Shut down one of the nodes by pressing ‘ctrl-c’ in one of the terminal windows. It will cause the node to do a graceful leave from the cluster, telling the other nodes in the cluster that it is leaving. It will then be removed from the cluster, which you can see in the log output in the other terminals.
Look at the source code of the actor again. It registers itself as subscriber of certain cluster events. It gets notified a stream of events leading up to the current state. After that it receives events for changes that happen in the cluster.
Now we have seen how to subscribe to cluster membership events. You can read more about it in the documentation. The membership events show us the state of the cluster but it does not help with accessing actors on other nodes the cluster. To do that we need to use the Receptionist.
Receptionist is a service registry that will work both when in single JVM apps not using cluster, and in clustered apps.
ActorRefs are registered to the receptionist using a
ServiceKey. The service key is defined with a type of message that actors registered for it will accept and a string identifier.
Let’s take a look at an example that illustrates how workers, here only on nodes with the role backend, register themselves to the receptionist so that frontend nodes will know what workers are available to perform their work. Note that a node could potentially have both roles, since the node roles are a set. The
main provided only allows one role though.
The example application provides a service to transform text. At a periodic interval the frontend simulates an external request to process a text which it forwards to available workers if there are any.
Since the discovery of workers is dynamic both backend and frontend nodes can be added to the cluster dynamically.
The backend worker that performs the transformation job is defined in TransformationBackend.scala. When starting up a worker registers itself to the receptionist so that it can be discovered through its
ServiceKey on any node in the cluster.
The frontend that simulates user jobs as well as keeping track of available workers is defined in Frontend.scala. The actor subscribes to the
Receptionist with the
WorkerServiceKey to receive updates when the set of available workers in the cluster changes. If a worker dies or its node is removed from the cluster the receptionist will send out an updated listing so the frontend does not need to
watch the workers.
To run this sample, make sure you have shut down any previously started cluster sample, then type
sbt "runMain sample.cluster.transformation.App".
TransformationApp starts 5 actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and run the following commands in separate terminal windows.
sbt "runMain sample.cluster.transformation.App backend 17356"
sbt "runMain sample.cluster.transformation.App backend 17357"
sbt "runMain sample.cluster.transformation.App backend 0"
sbt "runMain sample.cluster.transformation.App frontend 0"
sbt "runMain sample.cluster.transformation.App frontend 0"
There is a component built into Apache Pekko that performs the task of subscribing to the receptionist and keeping track of available actors significantly simplifying such interactions: the group router. Let’s look into how we can use those in the next section!
The group routers relies on the
Receptionist and will therefore route messages to services registered in any node of the cluster.
Let’s take a look at a few samples that make use of cluster aware routers.
Let’s take a look at two different ways to distribute work across a cluster using routers.
Note that the samples just shows off various parts of Apache Pekko Cluster and does not provide a complete structure to build a resilient distributed application with. The Distributed Workers With Pekko sample covers more of the problems you would have to solve to build a resilient distributed processing application.
The example application provides a service to calculate statistics for a text. When some text is sent to the service it splits it into words, and delegates the task to count number of characters in each word to a separate worker, a routee of a router. The character count for each word is sent back to an aggregator that calculates the average number of characters per word when all results have been collected.
The worker that counts number of characters in each word is defined in StatsWorker.scala.
The service that receives text from users and splits it up into words, delegates to a pool of workers and aggregates the result is defined in StatsService.scala.
Note, nothing cluster specific so far, just plain actors.
Nodes in the cluster can be marked with roles, to perform different tasks, in our case we use
compute as a role to designate cluster nodes that should do processing of word statistics.
In StatsSample.scala each
compute node starts a
StatsService that distributes work over N local
StatsWorkers. The client nodes then message the
StatsService instances through a
group router. The router finds services by subscribing to the cluster receptionist and a service key. Each worker is registered to the receptionist when started.
With this design a single
compute node crashing will only lose the ongoing work in that node and have the other nodes keep on with their work, but there is no single place to ask for a list of the current work in progress.
To run the sample, type
sbt "runMain sample.cluster.stats.App" if it is not already started.
StatsSample starts 4 actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and run the following commands in separate terminal windows.
sbt "runMain sample.cluster.stats.App compute 17356"
sbt "runMain sample.cluster.stats.App compute 17357"
sbt "runMain sample.cluster.stats.App compute 0"
sbt "runMain sample.cluster.stats.App client 0"
compute node starts N workers, that register themselves with the receptionist. The
StatsService is run in a single instance in the cluster through the Apache Pekko Cluster Singleton. The actual work is performed by workers on all compute nodes though. The workers are reached through a group router used by the singleton.
With this design it would be possible to query the singleton for current work - it knows all current requests in flight and could potentially make decisions based on knowing exactly what work is currently in progress.
If the singleton node crashes however, all ongoing work is lost though since the state of the singleton is not persistent, when it is started on a new node the
StatsService will not know of any previous work. It also means that since all work has to go through the singleton it could be come a bottleneck. If one of the other nodes crash only the ongoing work sent to them is lost, however since each ongoing request could be handled by multiple different workers on different nodes a crash could cause problems to many requests.
To run this sample, type
sbt "runMain sample.cluster.stats.AppOneMaster" if it is not already started.
StatsSampleOneMaster starts 4 actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and run the following commands in separate terminal windows.
sbt "runMain sample.cluster.stats.AppOneMaster compute 17356"
sbt "runMain sample.cluster.stats.AppOneMaster compute 17357"
sbt "runMain sample.cluster.stats.AppOneMaster compute 0"
sbt "runMain sample.cluster.stats.AppOneMaster client 0"
Tests can be found in src/multi-jvm. You can run them by typing