Offset in a relational DB with JDBC
The JdbcProjection
has support for storing the offset in a relational database using JDBC.
The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider
with supported offset types.
A JdbcHandler
receives a JdbcSession
instance and an envelope. The JdbcSession
provides the means to access an open JDBC connection that can be used to process the envelope. The target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.
Dependencies¶
To use the JDBC module of Apache Pekko Projections add the following dependency in your project:
libraryDependencies += "org.apache.pekko" %% "pekko-projection-jdbc" % "1.1.0"
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-projection-jdbc_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-projection-jdbc_${versions.ScalaBinary}:1.1.0"
}
Apache Pekko Projections require Pekko 1.1.3 or later, see Pekko version.
Project Info: Apache Pekko Projections JDBC | |
---|---|
Artifact | org.apache.pekko
pekko-projection-jdbc
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.16, 2.12.20, 3.3.5 |
JPMS module name | pekko.projection.jdbc |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies¶
The table below shows pekko-projection-jdbc
’s direct dependencies, and the second tab shows all libraries it depends on transitively.
Required configuration settings¶
There are two settings that need to be set beforehand in your application.conf
file.
pekko.projection.jdbc.dialect
- The dialect type indicating your database of choice. Supported dialects are:mysql-dialect
,postgres-dialect
,mssql-dialect
,oracle-dialect
orh2-dialect
(testing).pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size
indicating the size of the blocking JDBC dispatcher. See also Blocking JDBC Dispatcher.
Defining a JdbcSession¶
Before using Apache Pekko Projections JDBC you must implement a JdbcSession
interface. JdbcSession
is used to open a connection and start a transaction. A new JdbcSession
will be created for each call to the handler. At the end of the processing, the transaction will be committed (or rolled back).
When using JdbcProjection.exactlyOnce
, the JdbcSession
that is passed to the handler will be used to save the offset behind the scenes. Therefore, it’s extremely important to disable auto-commit (eg: setAutoCommit(false)
), otherwise the two operations won’t participate on the same transaction.
sourceimport java.sql.Connection
import java.sql.DriverManager
import org.apache.pekko.projection.jdbc.JdbcSession
class PlainJdbcSession extends JdbcSession {
lazy val conn = {
Class.forName("org.h2.Driver")
val c = DriverManager.getConnection("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1")
c.setAutoCommit(false)
c
}
override def withConnection[Result](func: function.Function[Connection, Result]): Result =
func(conn)
override def commit(): Unit = conn.commit()
override def rollback(): Unit = conn.rollback()
override def close(): Unit = conn.close()
}
sourceimport org.apache.pekko.projection.jdbc.JdbcSession;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Connection;
class PlainJdbcSession implements JdbcSession {
private final Connection connection;
public PlainJdbcSession() {
try {
Class.forName("org.h2.Driver");
this.connection = DriverManager.getConnection("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1");
connection.setAutoCommit(false);
} catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public <Result> Result withConnection(Function<Connection, Result> func) throws Exception {
return func.apply(connection);
}
@Override
public void commit() throws SQLException {
connection.commit();
}
@Override
public void rollback() throws SQLException {
connection.rollback();
}
@Override
public void close() throws SQLException {
connection.close();
}
}
It’s highly recommended configuring it with a connection pool, for example HikariCP.
When declaring a JdbcProjection
you must provide a factory for the JdbcSession
. The factory will be used to create new instances whenever needed.
An alternative Hibernate based implementation would look like this:
sourceimport org.hibernate.Session;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import java.sql.Connection;
import java.sql.SQLException;
public class HibernateJdbcSession implements JdbcSession {
public final EntityManager entityManager;
private final EntityTransaction transaction;
public HibernateJdbcSession(EntityManager entityManager) {
this.entityManager = entityManager;
this.transaction = this.entityManager.getTransaction();
this.transaction.begin();
}
@Override
public <Result> Result withConnection(Function<Connection, Result> func) {
Session hibernateSession = entityManager.unwrap(Session.class);
return hibernateSession.doReturningWork(
connection -> {
try {
return func.apply(connection);
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
});
}
@Override
public void commit() {
transaction.commit();
}
@Override
public void rollback() {
// propagates rollback call if transaction is active
if (transaction.isActive()) transaction.rollback();
}
@Override
public void close() {
this.entityManager.close();
}
}
And a special factory that initializes the EntityManagerFactory
and builds the JdbcSession
instance:
sourceimport javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
public class HibernateSessionFactory {
private final EntityManagerFactory entityManagerFactory;
public HibernateSessionFactory() {
this.entityManagerFactory =
Persistence.createEntityManagerFactory("pekko-projection-hibernate");
}
public HibernateJdbcSession newInstance() {
return new HibernateJdbcSession(entityManagerFactory.createEntityManager());
}
}
Blocking JDBC Dispatcher¶
JDBC APIs are blocking by design, therefore Apache Pekko Projections JDBC will use a dedicated dispatcher to run all JDBC calls. It’s important to configure the dispatcher to have the same size as the connection pool.
Each time the projection handler is called one thread and one database connection will be used. If your connection pool is smaller than the number of threads, the thread can potentially block while waiting for the connection pool to provide a connection.
The dispatcher pool size can be configured through the pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size
settings. See Configuration section below.
Most applications will use database connections to read data, for instance to read a projected model upon user request. This means that other parts of the application will be competing for a connection. It’s recommend to configure a connection pool dedicated to the projections and use a different one in other parts of the application.
exactly-once¶
The offset is stored in the same transaction used for the user defined handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.jdbc.scaladsl.JdbcProjection
val projection =
JdbcProjection
.exactlyOnce(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
() => new PlainJdbcSession, // JdbcSession Factory
handler = () => new ShoppingCartHandler(orderRepository))
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
Projection<EventEnvelope<ShoppingCart.Event>> projection =
JdbcProjection.exactlyOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
sessionProvider::newInstance,
ShoppingCartHandler::new,
system);
The ShoppingCartHandler
is shown below.
at-least-once¶
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.jdbc.scaladsl.JdbcProjection
val projection =
JdbcProjection
.atLeastOnce(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
() => new PlainJdbcSession, // JdbcSession Factory
handler = () => new ShoppingCartHandler(orderRepository))
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
JdbcProjection.atLeastOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
sessionProvider::newInstance,
ShoppingCartHandler::new,
system)
.withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section pekko.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
groupedWithin¶
The envelopes can be grouped before processing, which can be useful for batch updates.
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.jdbc.scaladsl.JdbcProjection
val projection =
JdbcProjection
.groupedWithin(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
() => new PlainJdbcSession, // JdbcSession Factory
handler = () => new GroupedShoppingCartHandler(orderRepository))
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
JdbcProjection.groupedWithin(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
sessionProvider::newInstance,
GroupedShoppingCartHandler::new,
system)
.withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section pekko.projection.grouped
.
When using groupedWithin
the handler is a JdbcHandler<List<EventEnvelope<ShoppingCart.Event>>>
. The GroupedShoppingCartHandler
is shown below.
The offset is stored in the same transaction used for the user defined handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
Handler¶
It’s in the JdbcHandler
that you implement the processing of each envelope. It’s essentially a consumer function from (JdbcSession, Envelope)
to void
.
A handler that is consuming ShoppingCart.Event
from eventsByTag
can look like this:
sourceimport org.apache.pekko.projection.jdbc.scaladsl.JdbcHandler
class ShoppingCartHandler(repository: OrderRepository)
extends JdbcHandler[EventEnvelope[ShoppingCart.Event], PlainJdbcSession] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(session: PlainJdbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Unit = {
envelope.event match {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info(s"Shopping cart $cartId was checked out at $time")
session.withConnection { conn =>
repository.save(conn, Order(cartId, time))
}
case otherEvent =>
logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
}
}
}
sourcepublic class ShoppingCartHandler
extends JdbcHandler<EventEnvelope<ShoppingCart.Event>, HibernateJdbcSession> {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void process(HibernateJdbcSession session, EventEnvelope<ShoppingCart.Event> envelope)
throws Exception {
ShoppingCart.Event event = envelope.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);
// pass the EntityManager created by the projection
// to the repository in order to use the same transaction
orderRepository.save(
session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime));
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
}
}
}
Such simple handlers can also be defined as plain functions via the helper JdbcHandler.fromFunction
factory method.
where the OrderRepository
is an implementation of:
sourcecase class Order(id: String, time: Instant)
trait OrderRepository {
def save(connection: Connection, order: Order): Unit
}
sourceclass Order {
public final String id;
public final Instant time;
public Order(String id, Instant time) {
this.id = id;
this.time = time;
}
}
interface OrderRepository {
void save(EntityManager entityManager, Order order);
}
Grouped handler¶
When using JdbcProjection.groupedWithin
the handler is processing a List
of envelopes.
sourceimport org.apache.pekko.projection.jdbc.scaladsl.JdbcHandler
import scala.collection.immutable
class GroupedShoppingCartHandler(repository: OrderRepository)
extends JdbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]], PlainJdbcSession] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(
session: PlainJdbcSession,
envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Unit = {
// save all events in DB
envelopes.map(_.event).foreach {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info(s"Shopping cart $cartId was checked out at $time")
session.withConnection { conn =>
repository.save(conn, Order(cartId, time))
}
case otherEvent =>
logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
}
}
}
sourcepublic class GroupedShoppingCartHandler
extends JdbcHandler<List<EventEnvelope<ShoppingCart.Event>>, HibernateJdbcSession> {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void process(
HibernateJdbcSession session, List<EventEnvelope<ShoppingCart.Event>> envelopes)
throws Exception {
for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) {
ShoppingCart.Event event = envelope.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);
// pass the EntityManager created by the projection
// to the repository in order to use the same transaction
orderRepository.save(
session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime));
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
}
}
}
}
Stateful handler¶
The JdbcHandler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process
.
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance.
Async handler¶
The Handler
can be used with JdbcProjection.atLeastOnceAsync
and JdbcProjection.groupedWithinAsync
if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.
There are several examples of such Handler
in the documentation for Cassandra Projections. Same type of handlers can be used with JdbcProjection
instead of CassandraProjection
.
Actor handler¶
A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.
Flow handler¶
An Apache Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.
Handler lifecycle¶
You can override the start
and stop
methods of the JdbcHandler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Schema¶
The database schema for the offset storage table:
sourceCREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(4) NOT NULL,
mergeable BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
CREATE INDEX IF NOT EXISTS projection_name_index ON pekko_projection_offset_store (projection_name);
CREATE TABLE IF NOT EXISTS pekko_projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
sourceCREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(4) NOT NULL,
mergeable BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name);
CREATE TABLE IF NOT EXISTS pekko_projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
sourceIF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_offset_store') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE pekko_projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(4) NOT NULL,
mergeable BIT NOT NULL,
last_updated BIGINT NOT NULL
)
ALTER TABLE pekko_projection_offset_store ADD CONSTRAINT pk_projection_id PRIMARY KEY(projection_name, projection_key)
CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name)
END
IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_management') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE pekko_projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BIT NOT NULL,
last_updated BIGINT NOT NULL
)
ALTER TABLE pekko_projection_management ADD CONSTRAINT pk_projection_management_id PRIMARY KEY(projection_name, projection_key)
END
sourceBEGIN
execute immediate 'create table "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"CURRENT_OFFSET" VARCHAR2(255) NOT NULL,"MANIFEST" VARCHAR2(4) NOT NULL,"MERGEABLE" CHAR(1) NOT NULL check ("MERGEABLE" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "PEKKO_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
execute immediate 'create index "PROJECTION_NAME_INDEX" on "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") ';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -955 THEN
NULL; -- suppresses ORA-00955 exception
ELSE
RAISE;
END IF;
END;
BEGIN
execute immediate 'create table "PEKKO_PROJECTION_MANAGEMENT" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"PAUSED" CHAR(1) NOT NULL check ("PAUSED" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "PEKKO_PROJECTION_MANAGEMENT" add constraint "PK_PROJECTION_MANAGEMENT_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -955 THEN
NULL; -- suppresses ORA-00955 exception
ELSE
RAISE;
END IF;
END;
sourceCREATE TABLE IF NOT EXISTS "pekko_projection_offset_store" (
"projection_name" VARCHAR(255) NOT NULL,
"projection_key" VARCHAR(255) NOT NULL,
"current_offset" VARCHAR(255) NOT NULL,
"manifest" VARCHAR(4) NOT NULL,
"mergeable" BOOLEAN NOT NULL,
"last_updated" BIGINT NOT NULL,
PRIMARY KEY("projection_name", "projection_key")
);
CREATE INDEX IF NOT EXISTS "projection_name_index" ON "pekko_projection_offset_store" ("projection_name");
CREATE TABLE IF NOT EXISTS "pekko_projection_management" (
"projection_name" VARCHAR(255) NOT NULL,
"projection_key" VARCHAR(255) NOT NULL,
"paused" BOOLEAN NOT NULL,
"last_updated" BIGINT NOT NULL,
PRIMARY KEY("projection_name", "projection_key")
);
The schema can be created and dropped using the methods JdbcProjection.createTablesIfNotExists
and JdbcProjection.dropTablesIfExists
. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.
As of version 1.1.0, the schema for PostgreSQL and H2 databases has changed. It now defaults to lowercase table and column names. If you have a schema in production, we recommend applying an ALTER table script to change it accordingly.
Alternatively, you can fallback to the uppercase format. You will also need to set pekko.projection.jdbc.offset-store.table
as an uppercase value, as this setting is now defaulting to lowercase.
pekko.projection.jdbc.offset-store {
table = "PEKKO_PROJECTION_OFFSET_STORE"
use-lowercase-schema = false
}
Offset types¶
The supported offset types of the JdbcProjection
are:
Offset
types from events from Apache Pekko PersistenceMergeableOffset
that is used for messages from KafkaString
Int
Long
- Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
Configuration¶
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
sourcepekko.projection.jdbc {
# choose one of: mysql-dialect, postgres-dialect, mssql-dialect, oracle-dialect or h2-dialect (testing)
dialect = ""
use-dispatcher = "pekko.projection.jdbc.blocking-jdbc-dispatcher"
blocking-jdbc-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
# Use same number of threads as connections in the JDBC connection pool.
fixed-pool-size = ""
}
throughput = 1
}
offset-store {
# set this to your database schema if applicable, empty by default
schema = ""
# the database table name for the offset store
table = "pekko_projection_offset_store"
# the database table name for the projection manangement data
management-table = "pekko_projection_management"
# Use lowercase table and column names.
# This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive.
# Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
use-lowercase-schema = true
}
debug.verbose-offset-store-logging = false
}
Settings pekko.projection.jdbc.dialect
and pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size
do not have a valid default value. You must configured them in your application.conf
file.
See Required Configuration Settings and Blocking JDBC Dispatcher sections for details.