Extending Apache Pekko
Apache Pekko extensions can be used for almost anything, they provide a way to create an instance of a class only once for the whole ActorSystem and be able to access it from anywhere. Pekko features such as Cluster, Serialization and Sharding are all Pekko extensions. Below is the use-case of managing an expensive database connection pool and accessing it from various places in your application.
You can choose to have your Extension loaded on-demand or at ActorSystem
creation time through the Pekko configuration. Details on how to make that happens are below, in the Loading from Configuration section.
Since an extension is a way to hook into Pekko itself, the implementor of the extension needs to ensure the thread safety and that it is non-blocking.
Building an extension¶
Let’s build an extension to manage a shared database connection pool.
sourceclass ExpensiveDatabaseConnection {
def executeQuery(query: String): Future[Any] = ???
}
sourcepublic class ExpensiveDatabaseConnection {
public CompletionStage<Object> executeQuery(String query) {
throw new RuntimeException("I should do a database query");
}
// ...
}
First create an Extension
, this will be created only once per ActorSystem:
sourceclass DatabasePool(system: ActorSystem[_]) extends Extension {
// database configuration can be loaded from config
// from the actor system
private val _connection = new ExpensiveDatabaseConnection()
def connection(): ExpensiveDatabaseConnection = _connection
}
sourcepublic class DatabaseConnectionPool implements Extension {
private final ExpensiveDatabaseConnection _connection;
private DatabaseConnectionPool(ActorSystem<?> system) {
// database configuration can be loaded from config
// from the actor system
_connection = new ExpensiveDatabaseConnection();
}
public ExpensiveDatabaseConnection connection() {
return _connection;
}
}
This is the public API of your extension. Internally in this example we instantiate our expensive database connection.
Then create an ExtensionId
to identify the extension. A good convention is to let the companion object of the Extension
be the ExtensionId
.
sourceobject DatabasePool extends ExtensionId[DatabasePool] {
// will only be called once
def createExtension(system: ActorSystem[_]): DatabasePool = new DatabasePool(system)
// Java API
def get(system: ActorSystem[_]): DatabasePool = apply(system)
}
sourcepublic static class Id extends ExtensionId<DatabaseConnectionPool> {
private static final Id instance = new Id();
private Id() {}
// called once per ActorSystem
@Override
public DatabaseConnectionPool createExtension(ActorSystem<?> system) {
return new DatabaseConnectionPool(system);
}
public static DatabaseConnectionPool get(ActorSystem<?> system) {
return instance.apply(system);
}
}
Then finally to use the extension it can be looked up:
sourceBehaviors.setup[Any] { ctx =>
DatabasePool(ctx.system).connection().executeQuery("insert into...")
initialBehavior
}
sourceBehaviors.setup(
(context) -> {
DatabaseConnectionPool.Id.get(context.getSystem())
.connection()
.executeQuery("insert into...");
return initialBehavior();
});
The DatabaseConnectionPool
can be looked up in this way any number of times and it will return the same instance.
Loading from configuration¶
To be able to load extensions from your Pekko configuration you must add FQCNs of implementations of the ExtensionId
in the pekko.actor.typed.extensions
section of the config you provide to your ActorSystem
.
sourcepekko.actor.typed.extensions = ["org.apache.pekko.pekko.extensions.DatabasePool"]