Cluster Usage

This document describes how to use Apache Pekko Cluster and the Cluster APIs.

For specific documentation topics see:

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Cluster.

You have to enable serialization to send messages between ActorSystems (nodes) in the Cluster. Serialization with Jackson is a good choice in many cases, and our recommendation if you don’t have other preferences or constraints.

Module info

To use Pekko Cluster add the following dependency in your project:

sbt
val PekkoVersion = "1.0.0"
libraryDependencies += "org.apache.pekko" %% "pekko-cluster-typed" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.0.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-cluster-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.0")

  implementation "org.apache.pekko:pekko-cluster-typed_${versions.ScalaBinary}"
}
Project Info: Pekko Cluster (typed)
Artifact
org.apache.pekko
pekko-cluster-typed
1.0.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.11, 2.12.18, 3.3.0
JPMS module namepekko.cluster.typed
License
Home pagehttps://pekko.apache.org/
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/incubator-pekko

Cluster API Extension

The Cluster extension gives you access to management tasks such as Joining, Leaving and Downing and subscription of cluster membership events such as MemberUp, MemberRemoved and UnreachableMember, which are exposed as event APIs.

It does this through these references on the ClusterCluster extension:

All of the examples below assume the following imports:

Scala
sourceimport org.apache.pekko
import pekko.actor.typed._
import pekko.actor.typed.scaladsl._
import pekko.cluster.ClusterEvent._
import pekko.cluster.MemberStatus
import pekko.cluster.typed._
Java
sourceimport org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.*;
import org.apache.pekko.cluster.ClusterEvent;
import org.apache.pekko.cluster.typed.*;

The minimum configuration required is to set a host/port for remoting and the pekko.actor.provider = "cluster".

sourcepekko {
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 7354
    }
  }

  cluster {
    seed-nodes = [
      "pekko://ClusterSystem@127.0.0.1:7354",
      "pekko://ClusterSystem@127.0.0.1:7355"]
    
    downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
  }
}

Accessing the ClusterCluster extension on each node:

Scala
sourceval cluster = Cluster(system)
Java
sourceCluster cluster = Cluster.get(system);
Note

The name of the cluster’s ActorSystemActorSystem must be the same for all members, which is passed in when you start the ActorSystem.

Joining and Leaving a Cluster

If not using configuration to specify seed nodes to join, joining the cluster can be done programmatically via the managermanager().

Scala
sourcecluster.manager ! Join(cluster.selfMember.address)
Java
sourcecluster.manager().tell(Join.create(cluster.selfMember().address()));

Leaving the cluster and downing a node are similar:

Scala
sourcecluster2.manager ! Leave(cluster2.selfMember.address)
Java
sourcecluster2.manager().tell(Leave.create(cluster2.selfMember().address()));

Cluster Subscriptions

Cluster subscriptionssubscriptions() can be used to receive messages when cluster state changes. For example, registering for all MemberEventMemberEvent’s, then using the manager to have a node leave the cluster will result in events for the node going through the Membership Lifecycle.

This example subscribes to a subscriber: ActorRef[MemberEvent]ActorRef<MemberEvent> subscriber:

Scala
sourcecluster.subscriptions ! Subscribe(subscriber, classOf[MemberEvent])
Java
sourcecluster.subscriptions().tell(Subscribe.create(subscriber, ClusterEvent.MemberEvent.class));

Then asking a node to leave:

Scala
sourcecluster.manager ! Leave(anotherMemberAddress)
// subscriber will receive events MemberLeft, MemberExited and MemberRemoved
Java
sourcecluster.manager().tell(Leave.create(anotherMemberAddress));
// subscriber will receive events MemberLeft, MemberExited and MemberRemoved

Cluster State

Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state with Cluster(system).stateCluster.get(system).state(). Note that this state is not necessarily in sync with the events published to a cluster subscription.

See Cluster Membership more information on member events specifically. There are more types of change events, consult the API documentation of classes that extends cluster.ClusterEvent.ClusterDomainEventcluster.ClusterEvent.ClusterDomainEvent for details about the events.

Cluster Membership API

Joining

The seed nodes are initial contact points for joining a cluster, which can be done in different ways:

After the joining process the seed nodes are not special and they participate in the cluster in exactly the same way as other nodes.

Joining automatically to seed nodes with Cluster Bootstrap

Automatic discovery of nodes for the joining process is available using the open source Pekko Management project’s module, Cluster Bootstrap. Please refer to its documentation for more details.

Joining configured seed nodes

When a new node is started it sends a message to all seed nodes and then sends a join command to the one that answers first. If none of the seed nodes replies (might not be started yet) it retries this procedure until success or shutdown.

You can define the seed nodes in the configuration file (application.conf):

pekko.cluster.seed-nodes = [
  "pekko://ClusterSystem@host1:7355",
  "pekko://ClusterSystem@host2:7355"]

This can also be defined as Java system properties when starting the JVM using the following syntax:

-Dpekko.cluster.seed-nodes.0=pekko://ClusterSystem@host1:7355
-Dpekko.cluster.seed-nodes.1=pekko://ClusterSystem@host2:7355

When a new node is started it sends a message to all configured seed-nodes and then sends a join command to the one that answers first. If none of the seed nodes replies (might not be started yet) it retries this procedure until successful or shutdown.

The seed nodes can be started in any order. It is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes list must be started when initially starting a cluster. If it is not, the other seed-nodes will not become initialized, and no other node can join the cluster. The reason for the special first seed node is to avoid forming separated islands when starting from an empty cluster. It is quickest to start all configured seed nodes at the same time (order doesn’t matter), otherwise it can take up to the configured seed-node-timeout until the nodes can join.

As soon as more than two seed nodes have been started, it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster. Note that if you stop all seed nodes at the same time and restart them with the same seed-nodes configuration they will join themselves and form a new cluster, instead of joining remaining nodes of the existing cluster. That is likely not desired and can be avoided by listing several nodes as seed nodes for redundancy, and don’t stop all of them at the same time.

If you are going to start the nodes on different machines you need to specify the ip-addresses or host names of the machines in application.conf instead of 127.0.0.1

Joining programmatically to seed nodes

Joining programmatically is useful when dynamically discovering other nodes at startup through an external tool or API.

Scala
sourceimport pekko.actor.Address
import pekko.actor.AddressFromURIString
import pekko.cluster.typed.JoinSeedNodes

val seedNodes: List[Address] =
  List("pekko://ClusterSystem@127.0.0.1:7354", "pekko://ClusterSystem@127.0.0.1:7355").map(
    AddressFromURIString.parse)
Cluster(system).manager ! JoinSeedNodes(seedNodes)
Java
sourceimport org.apache.pekko.actor.Address;
import org.apache.pekko.actor.AddressFromURIString;
import org.apache.pekko.cluster.Member;
import org.apache.pekko.cluster.typed.JoinSeedNodes;

List<Address> seedNodes = new ArrayList<>();
seedNodes.add(AddressFromURIString.parse("pekko://ClusterSystem@127.0.0.1:7354"));
seedNodes.add(AddressFromURIString.parse("pekko://ClusterSystem@127.0.0.1:7355"));

Cluster.get(system).manager().tell(new JoinSeedNodes(seedNodes));

The seed node address list has the same semantics as the configured seed-nodes, and the underlying implementation of the process is the same, see Joining configured seed nodes.

When joining to seed nodes you should not include the node itself, except for the node that is supposed to be the first seed node bootstrapping the cluster. The desired initial seed node address should be placed first in the parameter to the programmatic join.

Tuning joins

Unsuccessful attempts to contact seed nodes are automatically retried after the time period defined in configuration property seed-node-timeout. Unsuccessful attempts to join a specific seed node are automatically retried after the configured retry-unsuccessful-join-after. Retrying means that it tries to contact all seed nodes, then joins the node that answers first. The first node in the list of seed nodes will join itself if it cannot contact any of the other seed nodes within the configured seed-node-timeout.

The joining of given seed nodes will, by default, be retried indefinitely until a successful join. That process can be aborted if unsuccessful by configuring a timeout. When aborted it will run Coordinated Shutdown, which will terminate the ActorSystem by default. CoordinatedShutdown can also be configured to exit the JVM. If the seed-nodes are assembled dynamically, it is useful to define this timeout, and a restart with new seed-nodes should be tried after unsuccessful attempts.

pekko.cluster.shutdown-after-unsuccessful-join-seed-nodes = 20s
pekko.coordinated-shutdown.exit-jvm = on

If you don’t configure seed nodes or use one of the join seed node functions, you need to join the cluster manually by using JMX or HTTP.

You can join to any node in the cluster. It does not have to be configured as a seed node. Note that you can only join to an existing cluster member, which for bootstrapping means a node must join itself and subsequent nodes could join them to make up a cluster.

An actor system can only join a cluster once, additional attempts will be ignored. Once an actor system has successfully joined a cluster, it would have to be restarted to join the same cluster again. It can use the same host name and port after the restart. When it come up as a new incarnation of an existing member in the cluster and attempts to join, the existing member will be removed and its new incarnation allowed to join.

Leaving

There are a few ways to remove a member from the cluster.

  1. The recommended way to leave a cluster is a graceful exit, informing the cluster that a node shall leave. This is performed by Coordinated Shutdown when the ActorSystemActorSystem is terminated and also when a SIGTERM is sent from the environment to stop the JVM process.
  2. Graceful exit can also be performed using HTTP or JMX.
  3. When a graceful exit is not possible, for example in case of abrupt termination of the JVM process, the node will be detected as unreachable by other nodes and removed after Downing.

Graceful leaving offers faster hand off to peer nodes during node shutdown than abrupt termination and downing.

The Coordinated Shutdown will also run when the cluster node sees itself as Exiting, i.e. leaving from another node will trigger the shutdown process on the leaving node. Tasks for graceful leaving of cluster, including graceful shutdown of Cluster Singletons and Cluster Sharding, are added automatically when Pekko Cluster is used. For example, running the shutdown process will also trigger the graceful leaving if not already in progress.

Normally this is handled automatically, but in case of network failures during this process it may still be necessary to set the node’s status to Down in order to complete the removal, see Downing.

Downing

In many cases a member can gracefully exit from the cluster, as described in Leaving, but there are scenarios when an explicit downing decision is needed before it can be removed. For example in case of abrupt termination of the JVM process, system overload that doesn’t recover, or network partitions that don’t heal. In such cases, the node(s) will be detected as unreachable by other nodes, but they must also be marked as Down before they are removed.

When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of new joining members to ‘Up’. The node must first become reachable again, or the status of the unreachable member must be changed to Down. Changing status to Down can be performed automatically or manually.

We recommend that you enable the Split Brain Resolver that is part of the Pekko Cluster module. You enable it with configuration:

pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"

You should also consider the different available downing strategies.

If a downing provider is not configured downing must be performed manually using HTTP or JMX.

Note that Cluster Singleton or Cluster Sharding entities that are running on a crashed (unreachable) node will not be started on another node until the previous node has been removed from the Cluster. Removal of crashed (unreachable) nodes is performed after a downing decision.

Downing can also be performed programmatically with Cluster(system).manager ! Down(address)Cluster.get(system).manager().tell(Down(address)), but that is mostly useful from tests and when implementing a DowningProviderDowningProvider.

If a crashed node is restarted and joining the cluster again with the same hostname and port, the previous incarnation of that member will first be downed and removed. The new join attempt with same hostname and port is used as evidence that the previous is no longer alive.

If a node is still running and sees its self as Down it will shutdown. Coordinated Shutdown will automatically run if run-coordinated-shutdown-when-down is set to on (the default) however the node will not try and leave the cluster gracefully.

Node Roles

Not all nodes of a cluster need to perform the same function. For example, there might be one sub-set which runs the web front-end, one which runs the data access layer and one for the number-crunching. Choosing which actors to start on each node, for example cluster-aware routers, can take node roles into account to achieve this distribution of responsibilities.

The node roles are defined in the configuration property named pekko.cluster.roles and typically defined in the start script as a system property or environment variable.

The roles are part of the membership information in MemberEventMemberEvent that you can subscribe to. The roles of the own node are available from the selfMemberselfMember() and that can be used for conditionally starting certain actors:

Scala
sourceval selfMember = Cluster(context.system).selfMember
if (selfMember.hasRole("backend")) {
  context.spawn(Backend(), "back")
} else if (selfMember.hasRole("frontend")) {
  context.spawn(Frontend(), "front")
}
Java
sourceMember selfMember = Cluster.get(context.getSystem()).selfMember();
if (selfMember.hasRole("backend")) {
  context.spawn(Backend.create(), "back");
} else if (selfMember.hasRole("front")) {
  context.spawn(Frontend.create(), "front");
}

Failure Detector

The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. Please see:

Using the Failure Detector

Cluster uses the remote.PhiAccrualFailureDetectorremote.PhiAccrualFailureDetector failure detector by default, or you can provide your by implementing the remote.FailureDetectorremote.FailureDetector and configuring it:

pekko.cluster.implementation-class = "com.example.CustomFailureDetector"

In the Cluster Configuration you may want to adjust these depending on you environment:

  • When a phi value is considered to be a failure pekko.cluster.failure-detector.threshold
  • Margin of error for sudden abnormalities pekko.cluster.failure-detector.acceptable-heartbeat-pause

How to test

Pekko comes with and uses several types of testing strategies:

Configuration

There are several configuration properties for the cluster. Refer to the reference configuration for full configuration descriptions, default values and options.

How To Startup when a Cluster size is reached

A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size.

With a configuration option you can define the required number of members before the leader changes member status of ‘Joining’ members to ‘Up’.:

pekko.cluster.min-nr-of-members = 3

In a similar way you can define the required number of members of a certain role before the leader changes member status of ‘Joining’ members to ‘Up’.:

pekko.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

Cluster Info Logging

You can silence the logging of cluster events at info level with configuration property:

pekko.cluster.log-info = off

You can enable verbose logging of cluster events at info level, e.g. for temporary troubleshooting, with configuration property:

pekko.cluster.log-info-verbose = on

Cluster Dispatcher

The Cluster extension is implemented with actors. To protect them against disturbance from user actors they are by default run on the internal dispatcher configured under pekko.actor.internal-dispatcher. The cluster actors can potentially be isolated even further, onto their own dispatcher using the setting pekko.cluster.use-dispatcher or made run on the same dispatcher to keep the number of threads down.

Configuration Compatibility Check

Creating a cluster is about deploying two or more nodes and having them behave as if they were a single application. Therefore it’s extremely important that all nodes in a cluster are configured with compatible settings.

The Configuration Compatibility Check feature ensures that all nodes in a cluster have a compatible configuration. Whenever a new node is joining an existing cluster, a subset of its configuration settings (only those that are required to be checked) is sent to the nodes in the cluster for verification. Once the configuration is checked on the cluster side, the cluster sends back its own set of required configuration settings. The joining node will then verify if it’s compliant with the cluster configuration. The joining node will only proceed if all checks pass, on both sides.

New custom checkers can be added by extending cluster.JoinConfigCompatCheckercluster.JoinConfigCompatChecker and including them in the configuration. Each checker must be associated with a unique key:

pekko.cluster.configuration-compatibility-check.checkers {
  my-custom-config = "com.company.MyCustomJoinConfigCompatChecker"
}
Note

Configuration Compatibility Check is enabled by default, but can be disabled by setting pekko.cluster.configuration-compatibility-check.enforce-on-join = off. This is specially useful when performing rolling updates. Obviously this should only be done if a complete cluster shutdown isn’t an option. A cluster with nodes with different configuration settings may lead to data loss or data corruption.

This setting should only be disabled on the joining nodes. The checks are always performed on both sides, and warnings are logged. In case of incompatibilities, it is the responsibility of the joining node to decide if the process should be interrupted or not.

Higher level Cluster tools

Cluster Singleton

For some use cases it is convenient or necessary to ensure only one actor of a certain type is running somewhere in the cluster. This can be implemented by subscribing to member events, but there are several corner cases to consider. Therefore, this specific use case is covered by the Cluster Singleton.

See Cluster Singleton.

Cluster Sharding

Distributes actors across several nodes in the cluster and supports interaction with the actors using their logical identifier, but without having to care about their physical location in the cluster.

See Cluster Sharding.

Distributed Data

Distributed Data is useful when you need to share data between nodes in an Pekko Cluster. The data is accessed with an actor providing a key-value store like API.

See Distributed Data.

Distributed Publish Subscribe

Publish-subscribe messaging between actors in the cluster based on a topic, i.e. the sender does not have to know on which node the destination actor is running.

See Distributed Publish Subscribe.

Cluster aware routers

Distribute messages to actors on different nodes in the cluster with routing strategies like round-robin and consistent hashing.

See Group Routers.

Cluster across multiple data centers

Pekko Cluster can be used across multiple data centers, availability zones or regions, so that one Cluster can span multiple data centers and still be tolerant to network partitions.

See Cluster Multi-DC.

Reliable Delivery

Reliable delivery and flow control of messages between actors in the Cluster.

See Reliable Delivery

Example project

Cluster example project Cluster example project is an example project that can be downloaded, and with instructions of how to run.

This project contains samples illustrating different Cluster features, such as subscribing to cluster membership events, and sending messages to actors running on nodes in the cluster with Cluster aware routers.