Classic Apache Pekko Extensions

If you want to add features to Apache Pekko, there is a very elegant, but powerful mechanism for doing so. It’s called Apache Pekko Extensions and comprises 2 basic components: an ExtensionExtension and an ExtensionIdExtensionId.

Extensions will only be loaded once per ActorSystemActorSystem, which will be managed by Pekko. You can choose to have your Extension loaded on-demand or at ActorSystemActorSystem creation time through the Pekko configuration. Details on how to make that happens are below, in the Loading from Configuration section.

Warning

Since an extension is a way to hook into Pekko itself, the implementor of the extension needs to ensure the thread safety of his/her extension.

Building an Extension

So let’s create a sample extension that lets us count the number of times something has happened.

First, we define what our ExtensionExtension should do:

Scala
sourceimport org.apache.pekko.actor.Extension

class CountExtensionImpl extends Extension {
  // Since this Extension is a shared instance
  // per ActorSystem we need to be threadsafe
  private val counter = new AtomicLong(0)

  // This is the operation this Extension provides
  def increment() = counter.incrementAndGet()
}
Java
sourceimport org.apache.pekko.actor.*;
import java.util.concurrent.atomic.AtomicLong;

static class CountExtensionImpl implements Extension {
  // Since this Extension is a shared instance
  // per ActorSystem we need to be threadsafe
  private final AtomicLong counter = new AtomicLong(0);

  // This is the operation this Extension provides
  public long increment() {
    return counter.incrementAndGet();
  }
}

Then we need to create an ExtensionIdExtensionId for our extension so we can grab a hold of it.

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.actor.ExtensionId
import pekko.actor.ExtensionIdProvider
import pekko.actor.ExtendedActorSystem

object CountExtension extends ExtensionId[CountExtensionImpl] with ExtensionIdProvider {
  // The lookup method is required by ExtensionIdProvider,
  // so we return ourselves here, this allows us
  // to configure our extension to be loaded when
  // the ActorSystem starts up
  override def lookup = CountExtension

  // This method will be called by Pekko
  // to instantiate our Extension
  override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl

  /**
   * Java API: retrieve the Count extension for the given system.
   */
  override def get(system: ActorSystem): CountExtensionImpl = super.get(system)
  override def get(system: ClassicActorSystemProvider): CountExtensionImpl = super.get(system)
}
Java
sourceimport org.apache.pekko.actor.*;
import java.util.concurrent.atomic.AtomicLong;

static class CountExtension extends AbstractExtensionId<CountExtensionImpl>
    implements ExtensionIdProvider {
  // This will be the identifier of our CountExtension
  public static final CountExtension CountExtensionProvider = new CountExtension();

  private CountExtension() {}

  // The lookup method is required by ExtensionIdProvider,
  // so we return ourselves here, this allows us
  // to configure our extension to be loaded when
  // the ActorSystem starts up
  public CountExtension lookup() {
    return CountExtension.CountExtensionProvider; // The public static final
  }

  // This method will be called by Pekko
  // to instantiate our Extension
  public CountExtensionImpl createExtension(ExtendedActorSystem system) {
    return new CountExtensionImpl();
  }
}

Wicked! Now all we need to do is to actually use it:

Scala
sourceCountExtension(system).increment()
Java
source// typically you would use static import of the
// CountExtension.CountExtensionProvider field
CountExtension.CountExtensionProvider.get(system).increment();

Or from inside of a Pekko Actor:

Scala
source
class MyActor extends Actor { def receive = { case someMessage => CountExtension(context.system).increment() } }
Java
sourcestatic class MyActor extends AbstractActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchAny(
            msg -> {
              // typically you would use static import of the
              // CountExtension.CountExtensionProvider field
              CountExtension.CountExtensionProvider.get(getContext().getSystem()).increment();
            })
        .build();
  }
}

You can also hide extension behind traits:

source
trait Counting { self: Actor => def increment() = CountExtension(context.system).increment() } class MyCounterActor extends Actor with Counting { def receive = { case someMessage => increment() } }

That’s all there is to it!

Loading from Configuration

To be able to load extensions from your Pekko configuration you must add FQCNs of implementations of either ExtensionIdExtensionId or ExtensionIdProviderExtensionIdProvider in the pekko.extensions section of the config you provide to your ActorSystemActorSystem.

Scala
sourcepekko {
  extensions = ["docs.extension.CountExtension"]
}
Java
pekko {
  extensions = ["docs.extension.ExtensionDocTest.CountExtension"]
}

Applicability

The sky is the limit! By the way, did you know that Pekko Cluster, Serialization and other features are implemented as Pekko Extensions?

Application specific settings

The configuration can be used for application specific settings. A good practice is to place those settings in an Extension.

Sample configuration:

sourcemyapp {
  db {
    uri = "mongodb://example1.com:27017,example2.com:27017"
  }
  circuit-breaker {
    timeout = 30 seconds
  }
}

The ExtensionExtension:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.actor.Extension
import pekko.actor.ExtensionId
import pekko.actor.ExtensionIdProvider
import pekko.actor.ExtendedActorSystem

import scala.concurrent.duration.Duration
import com.typesafe.config.Config
import java.util.concurrent.TimeUnit

import pekko.actor.ClassicActorSystemProvider

class SettingsImpl(config: Config) extends Extension {
  val DbUri: String = config.getString("myapp.db.uri")
  val CircuitBreakerTimeout: Duration =
    Duration(config.getDuration("myapp.circuit-breaker.timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
}
object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider {

  override def lookup = Settings

  override def createExtension(system: ExtendedActorSystem) =
    new SettingsImpl(system.settings.config)

  /**
   * Java API: retrieve the Settings extension for the given system.
   */
  override def get(system: ActorSystem): SettingsImpl = super.get(system)
  override def get(system: ClassicActorSystemProvider): SettingsImpl = super.get(system)
}
Java
sourceimport org.apache.pekko.actor.Extension;
import org.apache.pekko.actor.AbstractExtensionId;
import org.apache.pekko.actor.ExtensionIdProvider;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ExtendedActorSystem;
import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit;
import java.time.Duration;

static class SettingsImpl implements Extension {

  public final String DB_URI;
  public final Duration CIRCUIT_BREAKER_TIMEOUT;

  public SettingsImpl(Config config) {
    DB_URI = config.getString("myapp.db.uri");
    CIRCUIT_BREAKER_TIMEOUT =
        Duration.ofMillis(
            config.getDuration("myapp.circuit-breaker.timeout", TimeUnit.MILLISECONDS));
  }
}

static class Settings extends AbstractExtensionId<SettingsImpl> implements ExtensionIdProvider {
  public static final Settings SettingsProvider = new Settings();

  private Settings() {}

  public Settings lookup() {
    return Settings.SettingsProvider;
  }

  public SettingsImpl createExtension(ExtendedActorSystem system) {
    return new SettingsImpl(system.settings().config());
  }
}

Use it:

Scala
source
class MyActor extends Actor { val settings = Settings(context.system) val connection = connect(settings.DbUri, settings.CircuitBreakerTimeout)
Java
sourcestatic class MyActor extends AbstractActor {
  // typically you would use static import of the Settings.SettingsProvider field
  final SettingsImpl settings = Settings.SettingsProvider.get(getContext().getSystem());
  Connection connection = connect(settings.DB_URI, settings.CIRCUIT_BREAKER_TIMEOUT);

}

Library extensions

A third part library may register its extension for auto-loading on actor system startup by appending it to pekko.library-extensions in its reference.conf.

pekko.library-extensions += "docs.extension.ExampleExtension"

As there is no way to selectively remove such extensions, it should be used with care and only when there is no case where the user would ever want it disabled or have specific support for disabling such sub-features. One example where this could be important is in tests.

Warning

Thepekko.library-extensions must never be assigned (= ["Extension"]) instead of appending as this will break the library-extension mechanism and make behavior depend on class path ordering.