Cluster Usage
This document describes how to use Apache Pekko Cluster and the Cluster APIs.
For specific documentation topics see:
- When and where to use Pekko Cluster
- Cluster Specification
- Cluster Membership Service
- Higher level Cluster tools
- Rolling Updates
- Operating, Managing, Observability
You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Cluster.
You have to enable serialization to send messages between ActorSystems (nodes) in the Cluster. Serialization with Jackson is a good choice in many cases, and our recommendation if you don’t have other preferences or constraints.
Module info
To use Pekko Cluster add the following dependency in your project:
- sbt
val PekkoVersion = "1.0.1" libraryDependencies += "org.apache.pekko" %% "pekko-cluster-typed" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-cluster-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.1") implementation "org.apache.pekko:pekko-cluster-typed_${versions.ScalaBinary}" }
Project Info: Pekko Cluster (typed) | |
---|---|
Artifact | org.apache.pekko
pekko-cluster-typed
1.0.1
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.11, 2.12.18, 3.3.1 |
JPMS module name | pekko.cluster.typed |
License | |
Home page | https://pekko.apache.org/ |
API documentation | |
Forums | |
Release notes | Release Notes |
Issues | Github issues |
Sources | https://github.com/apache/incubator-pekko |
Cluster API Extension
The Cluster extension gives you access to management tasks such as Joining, Leaving and Downing and subscription of cluster membership events such as MemberUp, MemberRemoved and UnreachableMember, which are exposed as event APIs.
It does this through these references on the Cluster
Cluster
extension:
manager
: AnActorRef
ActorRef
[cluster.typed.ClusterCommand
cluster.typed.ClusterCommand
]ActorRef
ActorRef
<cluster.typed.ClusterCommand
cluster.typed.ClusterCommand
> where aClusterCommand
is a command such as:Join
Join
,Leave
Leave
andDown
Down
subscriptions
: AnActorRef
ActorRef
[cluster.typed.ClusterStateSubscription
cluster.typed.ClusterStateSubscription
]ActorRef
ActorRef
<cluster.typed.ClusterStateSubscription
cluster.typed.ClusterStateSubscription
> where aClusterStateSubscription
is one ofGetCurrentState
GetCurrentState
orSubscribe
Subscribe
andUnsubscribe
Unsubscribe
to cluster events likeMemberRemoved
MemberRemoved
state
: The currentCurrentClusterState
CurrentClusterState
All of the examples below assume the following imports:
- Scala
-
source
import org.apache.pekko import pekko.actor.typed._ import pekko.actor.typed.scaladsl._ import pekko.cluster.ClusterEvent._ import pekko.cluster.MemberStatus import pekko.cluster.typed._
- Java
-
source
import org.apache.pekko.actor.typed.*; import org.apache.pekko.actor.typed.javadsl.*; import org.apache.pekko.cluster.ClusterEvent; import org.apache.pekko.cluster.typed.*;
The minimum configuration required is to set a host/port for remoting and the pekko.actor.provider = "cluster"
.
sourcepekko {
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 7354
}
}
cluster {
seed-nodes = [
"pekko://ClusterSystem@127.0.0.1:7354",
"pekko://ClusterSystem@127.0.0.1:7355"]
downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
}
}
Accessing the Cluster
Cluster
extension on each node:
The name of the cluster’s ActorSystem
ActorSystem
must be the same for all members, which is passed in when you start the ActorSystem
.
Joining and Leaving a Cluster
If not using configuration to specify seed nodes to join, joining the cluster can be done programmatically via the manager
manager()
.
- Scala
-
source
cluster.manager ! Join(cluster.selfMember.address)
- Java
-
source
cluster.manager().tell(Join.create(cluster.selfMember().address()));
Leaving the cluster and downing a node are similar:
- Scala
-
source
cluster2.manager ! Leave(cluster2.selfMember.address)
- Java
-
source
cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));
Cluster Subscriptions
Cluster subscriptions
subscriptions()
can be used to receive messages when cluster state changes. For example, registering for all MemberEvent
MemberEvent
’s, then using the manager
to have a node leave the cluster will result in events for the node going through the Membership Lifecycle.
This example subscribes to a subscriber: ActorRef[MemberEvent]
ActorRef<MemberEvent> subscriber
:
- Scala
-
source
cluster.subscriptions ! Subscribe(subscriber, classOf[MemberEvent])
- Java
-
source
cluster.subscriptions().tell(Subscribe.create(subscriber, ClusterEvent.MemberEvent.class));
Then asking a node to leave:
- Scala
-
source
cluster.manager ! Leave(anotherMemberAddress) // subscriber will receive events MemberLeft, MemberExited and MemberRemoved
- Java
-
source
cluster.manager().tell(Leave.create(anotherMemberAddress)); // subscriber will receive events MemberLeft, MemberExited and MemberRemoved
Cluster State
Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state with Cluster(system).state
Cluster.get(system).state()
. Note that this state is not necessarily in sync with the events published to a cluster subscription.
See Cluster Membership more information on member events specifically. There are more types of change events, consult the API documentation of classes that extends cluster.ClusterEvent.ClusterDomainEvent
cluster.ClusterEvent.ClusterDomainEvent
for details about the events.
Cluster Membership API
Joining
The seed nodes are initial contact points for joining a cluster, which can be done in different ways:
After the joining process the seed nodes are not special and they participate in the cluster in exactly the same way as other nodes.
Joining automatically to seed nodes with Cluster Bootstrap
Automatic discovery of nodes for the joining process is available using the open source Pekko Management project’s module, Cluster Bootstrap. Please refer to its documentation for more details.
Joining configured seed nodes
When a new node is started it sends a message to all seed nodes and then sends a join command to the one that answers first. If none of the seed nodes replies (might not be started yet) it retries this procedure until success or shutdown.
You can define the seed nodes in the configuration file (application.conf):
pekko.cluster.seed-nodes = [
"pekko://ClusterSystem@host1:7355",
"pekko://ClusterSystem@host2:7355"]
This can also be defined as Java system properties when starting the JVM using the following syntax:
-Dpekko.cluster.seed-nodes.0=pekko://ClusterSystem@host1:7355
-Dpekko.cluster.seed-nodes.1=pekko://ClusterSystem@host2:7355
When a new node is started it sends a message to all configured seed-nodes
and then sends a join command to the one that answers first. If none of the seed nodes replies (might not be started yet) it retries this procedure until successful or shutdown.
The seed nodes can be started in any order. It is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes
list must be started when initially starting a cluster. If it is not, the other seed-nodes will not become initialized, and no other node can join the cluster. The reason for the special first seed node is to avoid forming separated islands when starting from an empty cluster. It is quickest to start all configured seed nodes at the same time (order doesn’t matter), otherwise it can take up to the configured seed-node-timeout
until the nodes can join.
As soon as more than two seed nodes have been started, it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster. Note that if you stop all seed nodes at the same time and restart them with the same seed-nodes
configuration they will join themselves and form a new cluster, instead of joining remaining nodes of the existing cluster. That is likely not desired and can be avoided by listing several nodes as seed nodes for redundancy, and don’t stop all of them at the same time.
If you are going to start the nodes on different machines you need to specify the ip-addresses or host names of the machines in application.conf
instead of 127.0.0.1
Joining programmatically to seed nodes
Joining programmatically is useful when dynamically discovering other nodes at startup through an external tool or API.
- Scala
-
source
import pekko.actor.Address import pekko.actor.AddressFromURIString import pekko.cluster.typed.JoinSeedNodes val seedNodes: List[Address] = List("pekko://ClusterSystem@127.0.0.1:7354", "pekko://ClusterSystem@127.0.0.1:7355").map( AddressFromURIString.parse) Cluster(system).manager ! JoinSeedNodes(seedNodes)
- Java
-
source
import org.apache.pekko.actor.Address; import org.apache.pekko.actor.AddressFromURIString; import org.apache.pekko.cluster.Member; import org.apache.pekko.cluster.typed.JoinSeedNodes; List<Address> seedNodes = new ArrayList<>(); seedNodes.add(AddressFromURIString.parse("pekko://ClusterSystem@127.0.0.1:7354")); seedNodes.add(AddressFromURIString.parse("pekko://ClusterSystem@127.0.0.1:7355")); Cluster.get(system).manager().tell(new JoinSeedNodes(seedNodes));
The seed node address list has the same semantics as the configured seed-nodes
, and the underlying implementation of the process is the same, see Joining configured seed nodes.
When joining to seed nodes you should not include the node itself, except for the node that is supposed to be the first seed node bootstrapping the cluster. The desired initial seed node address should be placed first in the parameter to the programmatic join.
Tuning joins
Unsuccessful attempts to contact seed nodes are automatically retried after the time period defined in configuration property seed-node-timeout
. Unsuccessful attempts to join a specific seed node are automatically retried after the configured retry-unsuccessful-join-after
. Retrying means that it tries to contact all seed nodes, then joins the node that answers first. The first node in the list of seed nodes will join itself if it cannot contact any of the other seed nodes within the configured seed-node-timeout
.
The joining of given seed nodes will, by default, be retried indefinitely until a successful join. That process can be aborted if unsuccessful by configuring a timeout. When aborted it will run Coordinated Shutdown, which will terminate the ActorSystem by default. CoordinatedShutdown can also be configured to exit the JVM. If the seed-nodes
are assembled dynamically, it is useful to define this timeout, and a restart with new seed-nodes should be tried after unsuccessful attempts.
pekko.cluster.shutdown-after-unsuccessful-join-seed-nodes = 20s
pekko.coordinated-shutdown.exit-jvm = on
If you don’t configure seed nodes or use one of the join seed node functions, you need to join the cluster manually by using JMX or HTTP.
You can join to any node in the cluster. It does not have to be configured as a seed node. Note that you can only join to an existing cluster member, which for bootstrapping means a node must join itself and subsequent nodes could join them to make up a cluster.
An actor system can only join a cluster once, additional attempts will be ignored. Once an actor system has successfully joined a cluster, it would have to be restarted to join the same cluster again. It can use the same host name and port after the restart. When it come up as a new incarnation of an existing member in the cluster and attempts to join, the existing member will be removed and its new incarnation allowed to join.
Leaving
There are a few ways to remove a member from the cluster.
- The recommended way to leave a cluster is a graceful exit, informing the cluster that a node shall leave. This is performed by Coordinated Shutdown when the
ActorSystem
ActorSystem
is terminated and also when a SIGTERM is sent from the environment to stop the JVM process. - Graceful exit can also be performed using HTTP or JMX.
- When a graceful exit is not possible, for example in case of abrupt termination of the JVM process, the node will be detected as unreachable by other nodes and removed after Downing.
Graceful leaving offers faster hand off to peer nodes during node shutdown than abrupt termination and downing.
The Coordinated Shutdown will also run when the cluster node sees itself as Exiting
, i.e. leaving from another node will trigger the shutdown process on the leaving node. Tasks for graceful leaving of cluster, including graceful shutdown of Cluster Singletons and Cluster Sharding, are added automatically when Pekko Cluster is used. For example, running the shutdown process will also trigger the graceful leaving if not already in progress.
Normally this is handled automatically, but in case of network failures during this process it may still be necessary to set the node’s status to Down
in order to complete the removal, see Downing.
Downing
In many cases a member can gracefully exit from the cluster, as described in Leaving, but there are scenarios when an explicit downing decision is needed before it can be removed. For example in case of abrupt termination of the JVM process, system overload that doesn’t recover, or network partitions that don’t heal. In such cases, the node(s) will be detected as unreachable by other nodes, but they must also be marked as Down
before they are removed.
When a member is considered by the failure detector to be unreachable
the leader is not allowed to perform its duties, such as changing status of new joining members to ‘Up’. The node must first become reachable
again, or the status of the unreachable member must be changed to Down
. Changing status to Down
can be performed automatically or manually.
We recommend that you enable the Split Brain Resolver that is part of the Pekko Cluster module. You enable it with configuration:
pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
You should also consider the different available downing strategies.
If a downing provider is not configured downing must be performed manually using HTTP or JMX.
Note that Cluster Singleton or Cluster Sharding entities that are running on a crashed (unreachable) node will not be started on another node until the previous node has been removed from the Cluster. Removal of crashed (unreachable) nodes is performed after a downing decision.
Downing can also be performed programmatically with Cluster(system).manager ! Down(address)
Cluster.get(system).manager().tell(Down(address))
, but that is mostly useful from tests and when implementing a DowningProvider
DowningProvider
.
If a crashed node is restarted and joining the cluster again with the same hostname and port, the previous incarnation of that member will first be downed and removed. The new join attempt with same hostname and port is used as evidence that the previous is no longer alive.
If a node is still running and sees its self as Down
it will shutdown. Coordinated Shutdown will automatically run if run-coordinated-shutdown-when-down
is set to on
(the default) however the node will not try and leave the cluster gracefully.
Node Roles
Not all nodes of a cluster need to perform the same function. For example, there might be one sub-set which runs the web front-end, one which runs the data access layer and one for the number-crunching. Choosing which actors to start on each node, for example cluster-aware routers, can take node roles into account to achieve this distribution of responsibilities.
The node roles are defined in the configuration property named pekko.cluster.roles
and typically defined in the start script as a system property or environment variable.
The roles are part of the membership information in MemberEvent
MemberEvent
that you can subscribe to. The roles of the own node are available from the selfMember
selfMember()
and that can be used for conditionally starting certain actors:
- Scala
-
source
val selfMember = Cluster(context.system).selfMember if (selfMember.hasRole("backend")) { context.spawn(Backend(), "back") } else if (selfMember.hasRole("frontend")) { context.spawn(Frontend(), "front") }
- Java
-
source
Member selfMember = Cluster.get(context.getSystem()).selfMember(); if (selfMember.hasRole("backend")) { context.spawn(Backend.create(), "back"); } else if (selfMember.hasRole("front")) { context.spawn(Frontend.create(), "front"); }
Failure Detector
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. Please see:
- Failure Detector specification
- Phi Accrual Failure Detector implementation
- Using the Failure Detector
Using the Failure Detector
Cluster uses the remote.PhiAccrualFailureDetector
remote.PhiAccrualFailureDetector
failure detector by default, or you can provide your by implementing the remote.FailureDetector
remote.FailureDetector
and configuring it:
pekko.cluster.implementation-class = "com.example.CustomFailureDetector"
In the Cluster Configuration you may want to adjust these depending on you environment:
- When a phi value is considered to be a failure
pekko.cluster.failure-detector.threshold
- Margin of error for sudden abnormalities
pekko.cluster.failure-detector.acceptable-heartbeat-pause
How to test
Pekko comes with and uses several types of testing strategies:
Configuration
There are several configuration properties for the cluster. Refer to the reference configuration for full configuration descriptions, default values and options.
How To Startup when a Cluster size is reached
A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size.
With a configuration option you can define the required number of members before the leader changes member status of ‘Joining’ members to ‘Up’.:
pekko.cluster.min-nr-of-members = 3
In a similar way you can define the required number of members of a certain role before the leader changes member status of ‘Joining’ members to ‘Up’.:
pekko.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
Cluster Info Logging
You can silence the logging of cluster events at info level with configuration property:
pekko.cluster.log-info = off
You can enable verbose logging of cluster events at info level, e.g. for temporary troubleshooting, with configuration property:
pekko.cluster.log-info-verbose = on
Cluster Dispatcher
The Cluster extension is implemented with actors. To protect them against disturbance from user actors they are by default run on the internal dispatcher configured under pekko.actor.internal-dispatcher
. The cluster actors can potentially be isolated even further, onto their own dispatcher using the setting pekko.cluster.use-dispatcher
or made run on the same dispatcher to keep the number of threads down.
Configuration Compatibility Check
Creating a cluster is about deploying two or more nodes and having them behave as if they were a single application. Therefore it’s extremely important that all nodes in a cluster are configured with compatible settings.
The Configuration Compatibility Check feature ensures that all nodes in a cluster have a compatible configuration. Whenever a new node is joining an existing cluster, a subset of its configuration settings (only those that are required to be checked) is sent to the nodes in the cluster for verification. Once the configuration is checked on the cluster side, the cluster sends back its own set of required configuration settings. The joining node will then verify if it’s compliant with the cluster configuration. The joining node will only proceed if all checks pass, on both sides.
New custom checkers can be added by extending cluster.JoinConfigCompatChecker
cluster.JoinConfigCompatChecker
and including them in the configuration. Each checker must be associated with a unique key:
pekko.cluster.configuration-compatibility-check.checkers {
my-custom-config = "com.company.MyCustomJoinConfigCompatChecker"
}
Configuration Compatibility Check is enabled by default, but can be disabled by setting pekko.cluster.configuration-compatibility-check.enforce-on-join = off
. This is specially useful when performing rolling updates. Obviously this should only be done if a complete cluster shutdown isn’t an option. A cluster with nodes with different configuration settings may lead to data loss or data corruption.
This setting should only be disabled on the joining nodes. The checks are always performed on both sides, and warnings are logged. In case of incompatibilities, it is the responsibility of the joining node to decide if the process should be interrupted or not.
Higher level Cluster tools
Cluster Singleton
For some use cases it is convenient or necessary to ensure only one actor of a certain type is running somewhere in the cluster. This can be implemented by subscribing to member events, but there are several corner cases to consider. Therefore, this specific use case is covered by the Cluster Singleton.
See Cluster Singleton.
Cluster Sharding
Distributes actors across several nodes in the cluster and supports interaction with the actors using their logical identifier, but without having to care about their physical location in the cluster.
See Cluster Sharding.
Distributed Data
Distributed Data is useful when you need to share data between nodes in an Pekko Cluster. The data is accessed with an actor providing a key-value store like API.
See Distributed Data.
Distributed Publish Subscribe
Publish-subscribe messaging between actors in the cluster based on a topic, i.e. the sender does not have to know on which node the destination actor is running.
See Distributed Publish Subscribe.
Cluster aware routers
Distribute messages to actors on different nodes in the cluster with routing strategies like round-robin and consistent hashing.
See Group Routers.
Cluster across multiple data centers
Pekko Cluster can be used across multiple data centers, availability zones or regions, so that one Cluster can span multiple data centers and still be tolerant to network partitions.
See Cluster Multi-DC.
Reliable Delivery
Reliable delivery and flow control of messages between actors in the Cluster.
Example project
Cluster example project Cluster example project is an example project that can be downloaded, and with instructions of how to run.
This project contains samples illustrating different Cluster features, such as subscribing to cluster membership events, and sending messages to actors running on nodes in the cluster with Cluster aware routers.