Offset in a relational DB with JDBC
The JdbcProjection
JdbcProjection
has support for storing the offset in a relational database using JDBC.
The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider
with supported offset types.
A JdbcHandler
JdbcHandler
receives a JdbcSession
JdbcSession
instance and an envelope. The JdbcSession
provides the means to access an open JDBC connection that can be used to process the envelope. The target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.
Dependencies
To use the JDBC module of Apache Pekko Projections add the following dependency in your project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-jdbc" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-jdbc_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-jdbc_${versions.ScalaBinary}:1.0.0" }
Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.
Project Info: Apache Pekko Projections JDBC | |
---|---|
Artifact | org.apache.pekko
pekko-projection-jdbc
1.0.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 3.3.3, 2.13.13, 2.12.19 |
JPMS module name | pekko.projection.jdbc |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies
The table below shows pekko-projection-jdbc
’s direct dependencies, and the second tab shows all libraries it depends on transitively.
Required configuration settings
There are two settings that need to be set beforehand in your application.conf
file.
pekko.projection.jdbc.dialect
- The dialect type indicating your database of choice. Supported dialects are:mysql-dialect
,postgres-dialect
,mssql-dialect
,oracle-dialect
orh2-dialect
(testing).pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size
indicating the size of the blocking JDBC dispatcher. See also Blocking JDBC Dispatcher.
Defining a JdbcSession
Before using Apache Pekko Projections JDBC you must implement a JdbcSession
traitinterface. JdbcSession
is used to open a connection and start a transaction. A new JdbcSession
will be created for each call to the handler. At the end of the processing, the transaction will be committed (or rolled back).
When using JdbcProjection.exactlyOnce
, the JdbcSession
that is passed to the handler will be used to save the offset behind the scenes. Therefore, it’s extremely important to disable auto-commit (eg: setAutoCommit(false)
), otherwise the two operations won’t participate on the same transaction.
- Scala
-
source
import java.sql.Connection import java.sql.DriverManager import org.apache.pekko.projection.jdbc.JdbcSession class PlainJdbcSession extends JdbcSession { lazy val conn = { Class.forName("org.h2.Driver") val c = DriverManager.getConnection("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1") c.setAutoCommit(false) c } override def withConnection[Result](func: function.Function[Connection, Result]): Result = func(conn) override def commit(): Unit = conn.commit() override def rollback(): Unit = conn.rollback() override def close(): Unit = conn.close() }
- Java
-
source
import org.apache.pekko.projection.jdbc.JdbcSession; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Connection; class PlainJdbcSession implements JdbcSession { private final Connection connection; public PlainJdbcSession() { try { Class.forName("org.h2.Driver"); this.connection = DriverManager.getConnection("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"); connection.setAutoCommit(false); } catch (ClassNotFoundException | SQLException e) { throw new RuntimeException(e); } } @Override public <Result> Result withConnection(Function<Connection, Result> func) throws Exception { return func.apply(connection); } @Override public void commit() throws SQLException { connection.commit(); } @Override public void rollback() throws SQLException { connection.rollback(); } @Override public void close() throws SQLException { connection.close(); } }
It’s highly recommended configuring it with a connection pool, for example HikariCP.
When declaring a JdbcProjection
you must provide a factory for the JdbcSession
. The factory will be used to create new instances whenever needed.
An alternative Hibernate based implementation would look like this:
- Java
-
source
import org.hibernate.Session; import javax.persistence.EntityManager; import javax.persistence.EntityTransaction; import java.sql.Connection; import java.sql.SQLException; public class HibernateJdbcSession implements JdbcSession { public final EntityManager entityManager; private final EntityTransaction transaction; public HibernateJdbcSession(EntityManager entityManager) { this.entityManager = entityManager; this.transaction = this.entityManager.getTransaction(); this.transaction.begin(); } @Override public <Result> Result withConnection(Function<Connection, Result> func) { Session hibernateSession = entityManager.unwrap(Session.class); return hibernateSession.doReturningWork( connection -> { try { return func.apply(connection); } catch (SQLException e) { throw e; } catch (Exception e) { throw new SQLException(e); } }); } @Override public void commit() { transaction.commit(); } @Override public void rollback() { // propagates rollback call if transaction is active if (transaction.isActive()) transaction.rollback(); } @Override public void close() { this.entityManager.close(); } }
And a special factory that initializes the EntityManagerFactory
and builds the JdbcSession
instance:
- Java
-
source
import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; public class HibernateSessionFactory { private final EntityManagerFactory entityManagerFactory; public HibernateSessionFactory() { this.entityManagerFactory = Persistence.createEntityManagerFactory("pekko-projection-hibernate"); } public HibernateJdbcSession newInstance() { return new HibernateJdbcSession(entityManagerFactory.createEntityManager()); } }
Blocking JDBC Dispatcher
JDBC APIs are blocking by design, therefore Apache Pekko Projections JDBC will use a dedicated dispatcher to run all JDBC calls. It’s important to configure the dispatcher to have the same size as the connection pool.
Each time the projection handler is called one thread and one database connection will be used. If your connection pool is smaller than the number of threads, the thread can potentially block while waiting for the connection pool to provide a connection.
The dispatcher pool size can be configured through the pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size
settings. See Configuration section below.
Most applications will use database connections to read data, for instance to read a projected model upon user request. This means that other parts of the application will be competing for a connection. It’s recommend to configure a connection pool dedicated to the projections and use a different one in other parts of the application.
exactly-once
The offset is stored in the same transaction used for the user defined handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.jdbc.scaladsl.JdbcProjection val projection = JdbcProjection .exactlyOnce( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, () => new PlainJdbcSession, // JdbcSession Factory handler = () => new ShoppingCartHandler(orderRepository))
- Java
-
source
final HibernateSessionFactory sessionProvider = new HibernateSessionFactory(); Projection<EventEnvelope<ShoppingCart.Event>> projection = JdbcProjection.exactlyOnce( ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, sessionProvider::newInstance, ShoppingCartHandler::new, system);
The ShoppingCartHandler
is shown below.
at-least-once
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.jdbc.scaladsl.JdbcProjection val projection = JdbcProjection .atLeastOnce( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, () => new PlainJdbcSession, // JdbcSession Factory handler = () => new ShoppingCartHandler(orderRepository)) .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- Java
-
source
final HibernateSessionFactory sessionProvider = new HibernateSessionFactory(); int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection = JdbcProjection.atLeastOnce( ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, sessionProvider::newInstance, ShoppingCartHandler::new, system) .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section pekko.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
groupedWithin
The envelopes can be grouped before processing, which can be useful for batch updates.
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.jdbc.scaladsl.JdbcProjection val projection = JdbcProjection .groupedWithin( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, () => new PlainJdbcSession, // JdbcSession Factory handler = () => new GroupedShoppingCartHandler(orderRepository)) .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
- Java
-
source
final HibernateSessionFactory sessionProvider = new HibernateSessionFactory(); int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection = JdbcProjection.groupedWithin( ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, sessionProvider::newInstance, GroupedShoppingCartHandler::new, system) .withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section pekko.projection.grouped
.
When using groupedWithin
the handler is a JdbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]
JdbcHandler<List<EventEnvelope<ShoppingCart.Event>>>
. The GroupedShoppingCartHandler
is shown below.
The offset is stored in the same transaction used for the user defined handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
Handler
It’s in the JdbcHandler
JdbcHandler
that you implement the processing of each envelope. It’s essentially a consumer function from (JdbcSession, Envelope)
to Unit
void
.
A handler that is consuming ShoppingCart.Event
from eventsByTag
can look like this:
- Scala
-
source
import org.apache.pekko.projection.jdbc.scaladsl.JdbcHandler class ShoppingCartHandler(repository: OrderRepository) extends JdbcHandler[EventEnvelope[ShoppingCart.Event], PlainJdbcSession] { private val logger = LoggerFactory.getLogger(getClass) override def process(session: PlainJdbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Unit = { envelope.event match { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") session.withConnection { conn => repository.save(conn, Order(cartId, time)) } case otherEvent => logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") } } }
- Java
-
source
public class ShoppingCartHandler extends JdbcHandler<EventEnvelope<ShoppingCart.Event>, HibernateJdbcSession> { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void process(HibernateJdbcSession session, EventEnvelope<ShoppingCart.Event> envelope) throws Exception { ShoppingCart.Event event = envelope.event(); if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; logger.info( "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime); // pass the EntityManager created by the projection // to the repository in order to use the same transaction orderRepository.save( session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime)); } else { logger.debug("Shopping cart {} changed by {}", event.getCartId(), event); } } }
Such simple handlers can also be defined as plain functions via the helper JdbcHandler.apply
JdbcHandler.fromFunction
factory method.
where the OrderRepository
is an implementation of:
- Scala
-
source
case class Order(id: String, time: Instant) trait OrderRepository { def save(connection: Connection, order: Order): Unit }
- Java
-
source
class Order { public final String id; public final Instant time; public Order(String id, Instant time) { this.id = id; this.time = time; } } interface OrderRepository { void save(EntityManager entityManager, Order order); }
Grouped handler
When using JdbcProjection.groupedWithin
the handler is processing a Seq
List
of envelopes.
- Scala
-
source
import org.apache.pekko.projection.jdbc.scaladsl.JdbcHandler import scala.collection.immutable class GroupedShoppingCartHandler(repository: OrderRepository) extends JdbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]], PlainJdbcSession] { private val logger = LoggerFactory.getLogger(getClass) override def process( session: PlainJdbcSession, envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Unit = { // save all events in DB envelopes.map(_.event).foreach { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") session.withConnection { conn => repository.save(conn, Order(cartId, time)) } case otherEvent => logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") } } }
- Java
-
source
public class GroupedShoppingCartHandler extends JdbcHandler<List<EventEnvelope<ShoppingCart.Event>>, HibernateJdbcSession> { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void process( HibernateJdbcSession session, List<EventEnvelope<ShoppingCart.Event>> envelopes) throws Exception { for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) { ShoppingCart.Event event = envelope.event(); if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; logger.info( "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime); // pass the EntityManager created by the projection // to the repository in order to use the same transaction orderRepository.save( session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime)); } else { logger.debug("Shopping cart {} changed by {}", event.getCartId(), event); } } } }
Stateful handler
The JdbcHandler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process
.
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance.
Async handler
The Handler
Handler
can be used with JdbcProjection.atLeastOnceAsync
and JdbcProjection.groupedWithinAsync
if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.
There are several examples of such Handler
in the documentation for Cassandra Projections. Same type of handlers can be used with JdbcProjection
instead of CassandraProjection
.
Actor handler
A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.
Flow handler
An Apache Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.
Handler lifecycle
You can override the start
and stop
methods of the JdbcHandler
JdbcHandler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Schema
The database schema for the offset storage table:
- PostgreSQL
-
source
CREATE TABLE IF NOT EXISTS pekko_projection_offset_store ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, current_offset VARCHAR(255) NOT NULL, manifest VARCHAR(4) NOT NULL, mergeable BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) ); CREATE INDEX IF NOT EXISTS projection_name_index ON pekko_projection_offset_store (projection_name); CREATE TABLE IF NOT EXISTS pekko_projection_management ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, paused BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) );
- MySQL
-
source
CREATE TABLE IF NOT EXISTS pekko_projection_offset_store ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, current_offset VARCHAR(255) NOT NULL, manifest VARCHAR(4) NOT NULL, mergeable BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) ); CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name); CREATE TABLE IF NOT EXISTS pekko_projection_management ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, paused BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) );
- Microsoft SQL Server
-
source
IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_offset_store') AND TYPE IN (N'U')) BEGIN CREATE TABLE pekko_projection_offset_store ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, current_offset VARCHAR(255) NOT NULL, manifest VARCHAR(4) NOT NULL, mergeable BIT NOT NULL, last_updated BIGINT NOT NULL ) ALTER TABLE pekko_projection_offset_store ADD CONSTRAINT pk_projection_id PRIMARY KEY(projection_name, projection_key) CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name) END IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_management') AND TYPE IN (N'U')) BEGIN CREATE TABLE pekko_projection_management ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, paused BIT NOT NULL, last_updated BIGINT NOT NULL ) ALTER TABLE pekko_projection_management ADD CONSTRAINT pk_projection_management_id PRIMARY KEY(projection_name, projection_key) END
- Oracle
-
source
BEGIN execute immediate 'create table "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"CURRENT_OFFSET" VARCHAR2(255) NOT NULL,"MANIFEST" VARCHAR2(4) NOT NULL,"MERGEABLE" CHAR(1) NOT NULL check ("MERGEABLE" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) '; execute immediate 'alter table "PEKKO_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") '; execute immediate 'create index "PROJECTION_NAME_INDEX" on "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") '; EXCEPTION WHEN OTHERS THEN IF SQLCODE = -955 THEN NULL; -- suppresses ORA-00955 exception ELSE RAISE; END IF; END; BEGIN execute immediate 'create table "PEKKO_PROJECTION_MANAGEMENT" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"PAUSED" CHAR(1) NOT NULL check ("PAUSED" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) '; execute immediate 'alter table "PEKKO_PROJECTION_MANAGEMENT" add constraint "PK_PROJECTION_MANAGEMENT_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") '; EXCEPTION WHEN OTHERS THEN IF SQLCODE = -955 THEN NULL; -- suppresses ORA-00955 exception ELSE RAISE; END IF; END;
- H2
-
source
CREATE TABLE IF NOT EXISTS "pekko_projection_offset_store" ( "projection_name" VARCHAR(255) NOT NULL, "projection_key" VARCHAR(255) NOT NULL, "current_offset" VARCHAR(255) NOT NULL, "manifest" VARCHAR(4) NOT NULL, "mergeable" BOOLEAN NOT NULL, "last_updated" BIGINT NOT NULL, PRIMARY KEY("projection_name", "projection_key") ); CREATE INDEX IF NOT EXISTS "projection_name_index" ON "pekko_projection_offset_store" ("projection_name"); CREATE TABLE IF NOT EXISTS "pekko_projection_management" ( "projection_name" VARCHAR(255) NOT NULL, "projection_key" VARCHAR(255) NOT NULL, "paused" BOOLEAN NOT NULL, "last_updated" BIGINT NOT NULL, PRIMARY KEY("projection_name", "projection_key") );
The schema can be created and dropped using the methods JdbcProjection.createTablesIfNotExists
and JdbcProjection.dropTablesIfExists
. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.
As of version 1.1.0, the schema for PostgreSQL and H2 databases has changed. It now defaults to lowercase table and column names. If you have a schema in production, we recommend applying an ALTER table script to change it accordingly.
Alternatively, you can fallback to the uppercase format. You will also need to set pekko.projection.jdbc.offset-store.table
as an uppercase value, as this setting is now defaulting to lowercase.
pekko.projection.jdbc.offset-store {
table = "PEKKO_PROJECTION_OFFSET_STORE"
use-lowercase-schema = false
}
Offset types
The supported offset types of the JdbcProjection
are:
Offset
Offset
types from events from Apache Pekko PersistenceMergeableOffset
MergeableOffset
that is used for messages from KafkaString
Int
Long
- Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
Configuration
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
sourcepekko.projection.jdbc {
# choose one of: mysql-dialect, postgres-dialect, mssql-dialect, oracle-dialect or h2-dialect (testing)
dialect = ""
use-dispatcher = "pekko.projection.jdbc.blocking-jdbc-dispatcher"
blocking-jdbc-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
# Use same number of threads as connections in the JDBC connection pool.
fixed-pool-size = ""
}
throughput = 1
}
offset-store {
# set this to your database schema if applicable, empty by default
schema = ""
# the database table name for the offset store
table = "pekko_projection_offset_store"
# the database table name for the projection manangement data
management-table = "pekko_projection_management"
# Use lowercase table and column names.
# This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive.
# Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
use-lowercase-schema = true
}
debug.verbose-offset-store-logging = false
}
Settings pekko.projection.jdbc.dialect
and pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size
do not have a valid default value. You must configured them in your application.conf
file.
See Required Configuration Settings and Blocking JDBC Dispatcher sections for details.