Classic Cluster Metrics Extension
Module info
To use Cluster Metrics Extension, you must add the following dependency in your project:
- sbt
val PekkoVersion = "1.1.2+24-bcd44ee3-SNAPSHOT" libraryDependencies += "org.apache.pekko" %% "pekko-cluster-metrics" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2+24-bcd44ee3-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-cluster-metrics_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+24-bcd44ee3-SNAPSHOT") implementation "org.apache.pekko:pekko-cluster-metrics_${versions.ScalaBinary}" }
and add the following configuration stanza to your application.conf
:
pekko.extensions = [ "pekko.cluster.metrics.ClusterMetricsExtension" ]
Project Info: Pekko Cluster Metrics (classic) | |
---|---|
Artifact | org.apache.pekko
pekko-cluster-metrics
1.1.2+24-bcd44ee3-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.14, 2.12.20, 3.3.4 |
JPMS module name | pekko.cluster.metrics |
License | |
Home page | https://pekko.apache.org/ |
API documentation | |
Forums | |
Release notes | Release Notes |
Issues | Github issues |
Sources | https://github.com/apache/pekko |
Introduction
The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension.
Cluster metrics information is primarily used for load-balancing routers, and can also be used to implement advanced metrics-based node life cycles, such as “Node Let-it-crash” when CPU steal time becomes excessive.
Cluster members with status WeaklyUp, if that feature is enabled, will participate in Cluster Metrics collection and dissemination.
Metrics Collector
Metrics collection is delegated to an implementation of org.apache.pekko.cluster.metrics.MetricsCollector
.
Different collector implementations provide different subsets of metrics published to the cluster. Certain message routing and let-it-crash functions may not work when Sigar is not provisioned.
Cluster metrics extension comes with two built-in collector implementations:
org.apache.pekko.cluster.metrics.SigarMetricsCollector
, which requires Sigar provisioning, and is more rich/preciseorg.apache.pekko.cluster.metrics.JmxMetricsCollector
, which is used as fall back, and is less rich/precise
You can also plug-in your own metrics collector implementation.
By default, metrics extension will use collector provider fall back and will try to load them in this order:
- configured user-provided collector
- built-in
org.apache.pekko.cluster.metrics.SigarMetricsCollector
- and finally
org.apache.pekko.cluster.metrics.JmxMetricsCollector
Metrics Events
Metrics extension periodically publishes current snapshot of the cluster metrics to the node system event bus.
The publication interval is controlled by the pekko.cluster.metrics.collector.sample-interval
setting.
The payload of the org.apache.pekko.cluster.metrics.ClusterMetricsChanged
event will contain latest metrics of the node as well as other cluster member nodes metrics gossip which was received during the collector sample interval.
You can subscribe your metrics listener actors to these events in order to implement custom node lifecycle:
- Scala
-
ClusterMetricsExtension(system).subscribe(metricsListenerActor)
- Java
-
ClusterMetricsExtension.get(system).subscribe(metricsListenerActor);
Hyperic Sigar Provisioning
Both user-provided and built-in metrics collectors can optionally use Hyperic Sigar for a wider and more accurate range of metrics compared to what can be retrieved from ordinary JMX MBeans.
Sigar is using a native o/s library, and requires library provisioning, i.e. deployment, extraction and loading of the o/s native library into JVM at runtime.
User can provision Sigar classes and native library in one of the following ways:
- Use Kamon sigar-loader as a project dependency for the user project. Metrics extension will extract and load sigar library on demand with help of Kamon sigar provisioner.
- Use Kamon sigar-loader as java agent:
java -javaagent:/path/to/sigar-loader.jar
. Kamon sigar loader agent will extract and load sigar library during JVM start. - Place
sigar.jar
on theclasspath
and Sigar native library for the o/s on thejava.library.path
. User is required to manage both project dependency and library deployment manually.
When using Kamon sigar-loader and running multiple instances of the same application on the same host, you have to make sure that sigar library is extracted to a unique per instance directory. You can control the extract directory with the pekko.cluster.metrics.native-library-extract-folder
configuration setting.
To enable usage of Sigar you can add the following dependency to the user project:
- sbt
libraryDependencies += "io.kamon" % "sigar-loader" % "1.6.6-rev002"
- Maven
<dependencies> <dependency> <groupId>io.kamon</groupId> <artifactId>sigar-loader</artifactId> <version>1.6.6-rev002</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "io.kamon:sigar-loader:1.6.6-rev002" }
You can download Kamon sigar-loader from Maven Central
Adaptive Load Balancing
The AdaptiveLoadBalancingPool
/ AdaptiveLoadBalancingGroup
performs load balancing of messages to cluster nodes based on the cluster metrics data. It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node. It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:
heap
/HeapMetricsSelector
- Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / maxload
/SystemLoadAverageMetricsSelector
- System load average for the past 1 minute, corresponding value can be found intop
of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)cpu
/CpuMetricsSelector
- CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilizationmix
/MixMetricsSelector
- Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.- Any custom implementation of
org.apache.pekko.cluster.metrics.MetricsSelector
The collected metrics values are smoothed with exponential weighted moving average. In the Cluster configuration you can adjust how quickly past data is decayed compared to new data.
Let’s take a look at this router in action. What can be more demanding than calculating factorials?
The backend worker that performs the factorial calculation:
- Scala
-
source
class FactorialBackend extends Actor with ActorLogging { import context.dispatcher def receive = { case (n: Int) => Future(factorial(n)) .map { result => (n, result) } .pipeTo(sender()) } def factorial(n: Int): BigInt = { @tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = { if (n <= 1) acc else factorialAcc(acc * n, n - 1) } factorialAcc(BigInt(1), n) } }
- Java
-
source
public class FactorialBackend extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match( Integer.class, n -> { CompletableFuture<FactorialResult> result = CompletableFuture.supplyAsync(() -> factorial(n)) .thenApply((factorial) -> new FactorialResult(n, factorial)); pipe(result, getContext().dispatcher()).to(getSender()); }) .build(); } BigInteger factorial(int n) { BigInteger acc = BigInteger.ONE; for (int i = 1; i <= n; ++i) { acc = acc.multiply(BigInteger.valueOf(i)); } return acc; } }
The frontend that receives user jobs and delegates to the backends via the router:
- Scala
-
source
class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging { val backend = context.actorOf(FromConfig.props(), name = "factorialBackendRouter") override def preStart(): Unit = { sendJobs() if (repeat) { context.setReceiveTimeout(10.seconds) } } def receive = { case (n: Int, factorial: BigInt) => if (n == upToN) { log.debug("{}! = {}", n, factorial) if (repeat) sendJobs() else context.stop(self) } case ReceiveTimeout => log.info("Timeout") sendJobs() } def sendJobs(): Unit = { log.info("Starting batch of factorials up to [{}]", upToN) (1 to upToN).foreach { backend ! _ } } }
- Java
-
source
public class FactorialFrontend extends AbstractActor { final int upToN; final boolean repeat; LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); ActorRef backend = getContext().actorOf(FromConfig.getInstance().props(), "factorialBackendRouter"); public FactorialFrontend(int upToN, boolean repeat) { this.upToN = upToN; this.repeat = repeat; } @Override public void preStart() { sendJobs(); getContext().setReceiveTimeout(Duration.ofSeconds(10)); } @Override public Receive createReceive() { return receiveBuilder() .match( FactorialResult.class, result -> { if (result.n == upToN) { log.debug("{}! = {}", result.n, result.factorial); if (repeat) sendJobs(); else getContext().stop(getSelf()); } }) .match( ReceiveTimeout.class, x -> { log.info("Timeout"); sendJobs(); }) .build(); } void sendJobs() { log.info("Starting batch of factorials up to [{}]", upToN); for (int n = 1; n <= upToN; n++) { backend.tell(n, getSelf()); } } }
As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows:
pekko.actor.deployment {
/factorialFrontend/factorialBackendRouter = {
# Router type provided by metrics extension.
router = cluster-metrics-adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
routees.paths = ["/user/factorialBackend"]
cluster {
enabled = on
use-roles = ["backend"]
allow-local-routees = off
}
}
}
It is only router
type and the metrics-selector
parameter that is specific to this router, other things work in the same way as other routers.
The same type of router could also have been defined in code:
- Scala
-
source
import org.apache.pekko import pekko.cluster.routing.ClusterRouterGroup import pekko.cluster.routing.ClusterRouterGroupSettings import pekko.cluster.metrics.AdaptiveLoadBalancingGroup import pekko.cluster.metrics.HeapMetricsSelector val backend = context.actorOf( ClusterRouterGroup( AdaptiveLoadBalancingGroup(HeapMetricsSelector), ClusterRouterGroupSettings( totalInstances = 100, routeesPaths = List("/user/factorialBackend"), allowLocalRoutees = true, useRoles = Set("backend"))).props(), name = "factorialBackendRouter2") import pekko.cluster.routing.ClusterRouterPool import pekko.cluster.routing.ClusterRouterPoolSettings import pekko.cluster.metrics.AdaptiveLoadBalancingPool import pekko.cluster.metrics.SystemLoadAverageMetricsSelector val backend = context.actorOf( ClusterRouterPool( AdaptiveLoadBalancingPool(SystemLoadAverageMetricsSelector), ClusterRouterPoolSettings( totalInstances = 100, maxInstancesPerNode = 3, allowLocalRoutees = false, useRoles = Set("backend"))).props(Props[FactorialBackend]()), name = "factorialBackendRouter3")
- Java
-
source
int totalInstances = 100; Iterable<String> routeesPaths = Arrays.asList("/user/factorialBackend", ""); boolean allowLocalRoutees = true; Set<String> useRoles = new HashSet<>(Arrays.asList("backend")); ActorRef backend = getContext() .actorOf( new ClusterRouterGroup( new AdaptiveLoadBalancingGroup( HeapMetricsSelector.getInstance(), Collections.<String>emptyList()), new ClusterRouterGroupSettings( totalInstances, routeesPaths, allowLocalRoutees, useRoles)) .props(), "factorialBackendRouter2"); int totalInstances = 100; int maxInstancesPerNode = 3; boolean allowLocalRoutees = false; Set<String> useRoles = new HashSet<>(Arrays.asList("backend")); ActorRef backend = getContext() .actorOf( new ClusterRouterPool( new AdaptiveLoadBalancingPool( SystemLoadAverageMetricsSelector.getInstance(), 0), new ClusterRouterPoolSettings( totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles)) .props(Props.create(FactorialBackend.class)), "factorialBackendRouter3");
Subscribe to Metrics Events
It is possible to subscribe to the metrics events directly to implement other functionality.
- Scala
-
source
import org.apache.pekko import pekko.actor.ActorLogging import pekko.actor.Actor import pekko.cluster.Cluster import pekko.cluster.metrics.ClusterMetricsEvent import pekko.cluster.metrics.ClusterMetricsChanged import pekko.cluster.ClusterEvent.CurrentClusterState import pekko.cluster.metrics.NodeMetrics import pekko.cluster.metrics.StandardMetrics.HeapMemory import pekko.cluster.metrics.StandardMetrics.Cpu import pekko.cluster.metrics.ClusterMetricsExtension class MetricsListener extends Actor with ActorLogging { val selfAddress = Cluster(context.system).selfAddress val extension = ClusterMetricsExtension(context.system) // Subscribe unto ClusterMetricsEvent events. override def preStart(): Unit = extension.subscribe(self) // Unsubscribe from ClusterMetricsEvent events. override def postStop(): Unit = extension.unsubscribe(self) def receive = { case ClusterMetricsChanged(clusterMetrics) => clusterMetrics.filter(_.address == selfAddress).foreach { nodeMetrics => logHeap(nodeMetrics) logCpu(nodeMetrics) } case state: CurrentClusterState => // Ignore. } def logHeap(nodeMetrics: NodeMetrics): Unit = nodeMetrics match { case HeapMemory(address, timestamp, used, committed, max) => log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024) case _ => // No heap info. } def logCpu(nodeMetrics: NodeMetrics): Unit = nodeMetrics match { case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, cpuStolen, processors) => log.info("Load: {} ({} processors)", systemLoadAverage, processors) case _ => // No cpu info. } }
- Java
-
source
import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.cluster.Cluster; import org.apache.pekko.cluster.ClusterEvent.CurrentClusterState; import org.apache.pekko.cluster.metrics.ClusterMetricsChanged; import org.apache.pekko.cluster.metrics.NodeMetrics; import org.apache.pekko.cluster.metrics.StandardMetrics; import org.apache.pekko.cluster.metrics.StandardMetrics.HeapMemory; import org.apache.pekko.cluster.metrics.StandardMetrics.Cpu; import org.apache.pekko.cluster.metrics.ClusterMetricsExtension; import org.apache.pekko.event.Logging; import org.apache.pekko.event.LoggingAdapter; public class MetricsListener extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); Cluster cluster = Cluster.get(getContext().getSystem()); ClusterMetricsExtension extension = ClusterMetricsExtension.get(getContext().getSystem()); // Subscribe unto ClusterMetricsEvent events. @Override public void preStart() { extension.subscribe(getSelf()); } // Unsubscribe from ClusterMetricsEvent events. @Override public void postStop() { extension.unsubscribe(getSelf()); } @Override public Receive createReceive() { return receiveBuilder() .match( ClusterMetricsChanged.class, clusterMetrics -> { for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) { if (nodeMetrics.address().equals(cluster.selfAddress())) { logHeap(nodeMetrics); logCpu(nodeMetrics); } } }) .match( CurrentClusterState.class, message -> { // Ignore. }) .build(); } void logHeap(NodeMetrics nodeMetrics) { HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics); if (heap != null) { log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024); } } void logCpu(NodeMetrics nodeMetrics) { Cpu cpu = StandardMetrics.extractCpu(nodeMetrics); if (cpu != null && cpu.systemLoadAverage().isDefined()) { log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(), cpu.processors()); } } }
Custom Metrics Collector
Metrics collection is delegated to the implementation of org.apache.pekko.cluster.metrics.MetricsCollector
You can plug-in your own metrics collector instead of built-in org.apache.pekko.cluster.metrics.SigarMetricsCollector
or org.apache.pekko.cluster.metrics.JmxMetricsCollector
.
Look at those two implementations for inspiration.
Custom metrics collector implementation class must be specified in the pekko.cluster.metrics.collector.provider
configuration property.
Configuration
The Cluster metrics extension can be configured with the following properties:
source# SPDX-License-Identifier: Apache-2.0
###############################################
# Pekko Cluster Metrics Reference Config File #
###############################################
# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
# Sigar provisioning:
#
# User can provision sigar classes and native library in one of the following ways:
#
# 1) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as a project dependency for the user project.
# Metrics extension will extract and load sigar library on demand with help of Kamon sigar provisioner.
#
# 2) Use https://github.com/kamon-io/sigar-loader Kamon sigar-loader as java agent: `java -javaagent:/path/to/sigar-loader.jar`
# Kamon sigar loader agent will extract and load sigar library during JVM start.
#
# 3) Place `sigar.jar` on the `classpath` and sigar native library for the o/s on the `java.library.path`
# User is required to manage both project dependency and library deployment manually.
# Cluster metrics extension.
# Provides periodic statistics collection and publication throughout the cluster.
pekko.cluster.metrics {
# Full path of dispatcher configuration key.
dispatcher = "pekko.actor.default-dispatcher"
# How long should any actor wait before starting the periodic tasks.
periodic-tasks-initial-delay = 1s
# Sigar native library extract location.
# Use per-application-instance scoped location, such as program working directory.
native-library-extract-folder = ${user.dir}"/native"
# Metrics supervisor actor.
supervisor {
# Actor name. Example name space: /system/cluster-metrics
name = "cluster-metrics"
# Supervision strategy.
strategy {
#
# FQCN of class providing `org.apache.pekko.actor.SupervisorStrategy`.
# Must have a constructor with signature `<init>(com.typesafe.config.Config)`.
# Default metrics strategy provider is a configurable extension of `OneForOneStrategy`.
provider = "org.apache.pekko.cluster.metrics.ClusterMetricsStrategy"
#
# Configuration of the default strategy provider.
# Replace with custom settings when overriding the provider.
configuration = {
# Log restart attempts.
loggingEnabled = true
# Child actor restart-on-failure window.
withinTimeRange = 3s
# Maximum number of restart attempts before child actor is stopped.
maxNrOfRetries = 3
}
}
}
# Metrics collector actor.
collector {
# Enable or disable metrics collector for load-balancing nodes.
# Metrics collection can also be controlled at runtime by sending control messages
# to /system/cluster-metrics actor: `org.apache.pekko.cluster.metrics.{CollectionStartMessage,CollectionStopMessage}`
enabled = on
# FQCN of the metrics collector implementation.
# It must implement `org.apache.pekko.cluster.metrics.MetricsCollector` and
# have public constructor with org.apache.pekko.actor.ActorSystem parameter.
# Will try to load in the following order of priority:
# 1) configured custom collector 2) internal `SigarMetricsCollector` 3) internal `JmxMetricsCollector`
provider = ""
# Try all 3 available collector providers, or else fail on the configured custom collector provider.
fallback = true
# How often metrics are sampled on a node.
# Shorter interval will collect the metrics more often.
# Also controls frequency of the metrics publication to the node system event bus.
sample-interval = 3s
# How often a node publishes metrics information to the other nodes in the cluster.
# Shorter interval will publish the metrics gossip more often.
gossip-interval = 3s
# How quickly the exponential weighting of past data is decayed compared to
# new data. Set lower to increase the bias toward newer values.
# The relevance of each data sample is halved for every passing half-life
# duration, i.e. after 4 times the half-life, a data sample’s relevance is
# reduced to 6% of its original relevance. The initial relevance of a data
# sample is given by 1 – 0.5 ^ (collect-interval / half-life).
# See https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
moving-average-half-life = 12s
}
}
# Cluster metrics extension serializers and routers.
pekko.actor {
# Protobuf serializer for remote cluster metrics messages.
serializers {
pekko-cluster-metrics = "org.apache.pekko.cluster.metrics.protobuf.MessageSerializer"
}
# Interface binding for remote cluster metrics messages.
serialization-bindings {
"org.apache.pekko.cluster.metrics.ClusterMetricsMessage" = pekko-cluster-metrics
"org.apache.pekko.cluster.metrics.AdaptiveLoadBalancingPool" = pekko-cluster-metrics
"org.apache.pekko.cluster.metrics.MixMetricsSelector" = pekko-cluster-metrics
"org.apache.pekko.cluster.metrics.CpuMetricsSelector$" = pekko-cluster-metrics
"org.apache.pekko.cluster.metrics.HeapMetricsSelector$" = pekko-cluster-metrics
"org.apache.pekko.cluster.metrics.SystemLoadAverageMetricsSelector$" = pekko-cluster-metrics
}
# Globally unique metrics extension serializer identifier.
serialization-identifiers {
"org.apache.pekko.cluster.metrics.protobuf.MessageSerializer" = 10
}
# Provide routing of messages based on cluster metrics.
router.type-mapping {
cluster-metrics-adaptive-pool = "org.apache.pekko.cluster.metrics.AdaptiveLoadBalancingPool"
cluster-metrics-adaptive-group = "org.apache.pekko.cluster.metrics.AdaptiveLoadBalancingGroup"
}
}