Extending Apache Pekko
Apache Pekko extensions can be used for almost anything, they provide a way to create an instance of a class only once for the whole ActorSystem and be able to access it from anywhere. Pekko features such as Cluster, Serialization and Sharding are all Pekko extensions. Below is the use-case of managing an expensive database connection pool and accessing it from various places in your application.
You can choose to have your Extension loaded on-demand or at ActorSystem
creation time through the Pekko configuration. Details on how to make that happens are below, in the Loading from Configuration section.
Since an extension is a way to hook into Pekko itself, the implementor of the extension needs to ensure the thread safety and that it is non-blocking.
Building an extension
Let’s build an extension to manage a shared database connection pool.
- Scala
-
source
class ExpensiveDatabaseConnection { def executeQuery(query: String): Future[Any] = ??? }
- Java
-
source
public class ExpensiveDatabaseConnection { public CompletionStage<Object> executeQuery(String query) { throw new RuntimeException("I should do a database query"); } // ... }
First create an Extension
Extension
, this will be created only once per ActorSystem:
- Scala
-
source
class DatabasePool(system: ActorSystem[_]) extends Extension { // database configuration can be loaded from config // from the actor system private val _connection = new ExpensiveDatabaseConnection() def connection(): ExpensiveDatabaseConnection = _connection }
- Java
-
source
public class DatabaseConnectionPool implements Extension { private final ExpensiveDatabaseConnection _connection; private DatabaseConnectionPool(ActorSystem<?> system) { // database configuration can be loaded from config // from the actor system _connection = new ExpensiveDatabaseConnection(); } public ExpensiveDatabaseConnection connection() { return _connection; } }
This is the public API of your extension. Internally in this example we instantiate our expensive database connection.
Then create an ExtensionId
ExtensionId
to identify the extension. A good convention is to let the companion object of the Extension
be the ExtensionId
.A good convention is to define the ExtensionId
as a static inner class of the Extension
.
- Scala
-
source
object DatabasePool extends ExtensionId[DatabasePool] { // will only be called once def createExtension(system: ActorSystem[_]): DatabasePool = new DatabasePool(system) // Java API def get(system: ActorSystem[_]): DatabasePool = apply(system) }
- Java
-
source
public static class Id extends ExtensionId<DatabaseConnectionPool> { private static final Id instance = new Id(); private Id() {} // called once per ActorSystem @Override public DatabaseConnectionPool createExtension(ActorSystem<?> system) { return new DatabaseConnectionPool(system); } public static DatabaseConnectionPool get(ActorSystem<?> system) { return instance.apply(system); } }
Then finally to use the extension it can be looked up:
- Scala
-
source
Behaviors.setup[Any] { ctx => DatabasePool(ctx.system).connection().executeQuery("insert into...") initialBehavior }
- Java
-
source
Behaviors.setup( (context) -> { DatabaseConnectionPool.Id.get(context.getSystem()) .connection() .executeQuery("insert into..."); return initialBehavior(); });
The DatabaseConnectionPool
can be looked up in this way any number of times and it will return the same instance.
Loading from configuration
To be able to load extensions from your Pekko configuration you must add FQCNs of implementations of the ExtensionId
in the pekko.actor.typed.extensions
section of the config you provide to your ActorSystem
.
- Scala
-
source
pekko.actor.typed.extensions = ["org.apache.pekko.pekko.extensions.DatabasePool"]
- Java
ruby pekko.actor.typed { extensions = ["jdocs.org.apache.pekko.typed.extensions.ExtensionDocTest$DatabaseConnectionPool$Id"] }