Offset in a relational DB with Slick
The SlickProjection
SlickProjection
has support for storing the offset in a relational database using Slick (JDBC). This is only an option for Scala and for Java the offset can be stored in relational DB with JDBC. The JDBC module can also be used with Scala.
Use the JDBC module to implement your projection handler. This module is deprecated.
The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider
with supported offset types.
The envelope handler returns a DBIO
that will be run by the projection. This means that the target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.
Dependencies
To use the Slick module of Apache Pekko Projections add the following dependency in your project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-slick" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-slick_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-slick_${versions.ScalaBinary}:1.0.0" }
Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.
Project Info: Apache Pekko Projections Slick | |
---|---|
Artifact | org.apache.pekko
pekko-projection-slick
1.0.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.13, 2.12.19 |
JPMS module name | pekko.projection.slick |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies
The table below shows pekko-projection-slick
’s direct dependencies and the second tab shows all libraries it depends on transitively.
exactly-once
The offset is stored in the same transaction as the DBIO
returned from the handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
- Scala
-
source
import org.apache.pekko import pekko.projection.ProjectionId import pekko.projection.slick.SlickProjection import slick.basic.DatabaseConfig import slick.dbio.DBIO import slick.jdbc.H2Profile implicit val ec: ExecutionContext = system.executionContext val projection = SlickProjection.exactlyOnce( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, dbConfig, handler = () => new ShoppingCartHandler(repository))
The ShoppingCartHandler
is shown below.
at-least-once
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
- Scala
-
source
implicit val ec: ExecutionContext = system.executionContext val projection = SlickProjection .atLeastOnce( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, dbConfig, handler = () => new ShoppingCartHandler(repository)) .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section pekko.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
groupedWithin
The envelopes can be grouped before processing, which can be useful for batch updates.
- Scala
-
source
implicit val ec: ExecutionContext = system.executionContext val projection = SlickProjection .groupedWithin( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, dbConfig, handler = () => new GroupedShoppingCartHandler(repository)) .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section pekko.projection.grouped
.
When using groupedWithin
the handler is a SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]
. The GroupedShoppingCartHandler
is shown below.
The offset is stored in the same transaction as the DBIO
returned from the handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
Handler
It’s in the SlickHandler
SlickHandler
that you implement the processing of each envelope. It’s essentially a function from Envelope
to DBIO[Done]
. The returned DBIO
is run by the projection.
A handler that is consuming ShoppingCart.Event
from eventsByTag
can look like this:
- Scala
-
source
import scala.concurrent.Future import org.apache.pekko import pekko.Done import pekko.projection.slick.SlickHandler import org.slf4j.LoggerFactory class ShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext) extends SlickHandler[EventEnvelope[ShoppingCart.Event]] { private val logger = LoggerFactory.getLogger(getClass) override def process(envelope: EventEnvelope[ShoppingCart.Event]): DBIO[Done] = { envelope.event match { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") repository.save(Order(cartId, time)) case otherEvent => logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") DBIO.successful(Done) } } }
Such simple handlers can also be defined as plain functions via the helper SlickHandler.apply
factory method.
where the OrderRepository
is:
- Scala
-
source
case class Order(id: String, time: Instant) class OrderRepository(val dbConfig: DatabaseConfig[H2Profile]) { import dbConfig.profile.api._ private class OrdersTable(tag: Tag) extends Table[Order](tag, "ORDERS") { def id = column[String]("CART_ID", O.PrimaryKey) def time = column[Instant]("TIME") def * = (id, time).mapTo[Order] } private val ordersTable = TableQuery[OrdersTable] def save(order: Order)(implicit ec: ExecutionContext) = { ordersTable.insertOrUpdate(order).map(_ => Done) } def createTable(): Future[Unit] = dbConfig.db.run(ordersTable.schema.createIfNotExists) }
with the Slick DatabaseConfig
:
- Scala
-
source
val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig("pekko.projection.slick", system.settings.config) val repository = new OrderRepository(dbConfig)
Grouped handler
When using SlickProjection.groupedWithin
the handler is processing a Seq
of envelopes.
- Scala
-
source
import scala.collection.immutable class GroupedShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext) extends SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] { private val logger = LoggerFactory.getLogger(getClass) override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): DBIO[Done] = { val dbios = envelopes.map(_.event).map { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") repository.save(Order(cartId, time)) case otherEvent => logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") DBIO.successful(Done) } DBIO.sequence(dbios).map(_ => Done) } }
Stateful handler
The SlickHandler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process
.
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance.
Async handler
The Handler
Handler
can be used with SlickProjection.atLeastOnceAsync
and SlickProjection.groupedWithinAsync
if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.
There are several examples of such Handler
in the documentation for Cassandra Projections. Same type of handlers can be used with SlickProjection
instead of CassandraProjection
.
Actor handler
A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.
Flow handler
An Apache Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.
Handler lifecycle
You can override the start
and stop
methods of the SlickHandler
SlickHandler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Schema
The database schema for the offset storage table:
- PostgreSQL
-
source
CREATE TABLE IF NOT EXISTS pekko_projection_offset_store ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, current_offset VARCHAR(255) NOT NULL, manifest VARCHAR(4) NOT NULL, mergeable BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) ); CREATE INDEX IF NOT EXISTS projection_name_index ON pekko_projection_offset_store (projection_name); CREATE TABLE IF NOT EXISTS pekko_projection_management ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, paused BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) );
- MySQL
-
source
CREATE TABLE IF NOT EXISTS pekko_projection_offset_store ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, current_offset VARCHAR(255) NOT NULL, manifest VARCHAR(4) NOT NULL, mergeable BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) ); CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name); CREATE TABLE IF NOT EXISTS pekko_projection_management ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, paused BOOLEAN NOT NULL, last_updated BIGINT NOT NULL, PRIMARY KEY(projection_name, projection_key) );
- Microsoft SQL Server
-
source
IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_offset_store') AND TYPE IN (N'U')) BEGIN CREATE TABLE pekko_projection_offset_store ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, current_offset VARCHAR(255) NOT NULL, manifest VARCHAR(4) NOT NULL, mergeable BIT NOT NULL, last_updated BIGINT NOT NULL ) ALTER TABLE pekko_projection_offset_store ADD CONSTRAINT pk_projection_id PRIMARY KEY(projection_name, projection_key) CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name) END IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_management') AND TYPE IN (N'U')) BEGIN CREATE TABLE pekko_projection_management ( projection_name VARCHAR(255) NOT NULL, projection_key VARCHAR(255) NOT NULL, paused BIT NOT NULL, last_updated BIGINT NOT NULL ) ALTER TABLE pekko_projection_management ADD CONSTRAINT pk_projection_management_id PRIMARY KEY(projection_name, projection_key) END
- Oracle
-
source
BEGIN execute immediate 'create table "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"CURRENT_OFFSET" VARCHAR2(255) NOT NULL,"MANIFEST" VARCHAR2(4) NOT NULL,"MERGEABLE" CHAR(1) NOT NULL check ("MERGEABLE" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) '; execute immediate 'alter table "PEKKO_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") '; execute immediate 'create index "PROJECTION_NAME_INDEX" on "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") '; EXCEPTION WHEN OTHERS THEN IF SQLCODE = -955 THEN NULL; -- suppresses ORA-00955 exception ELSE RAISE; END IF; END; BEGIN execute immediate 'create table "PEKKO_PROJECTION_MANAGEMENT" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"PAUSED" CHAR(1) NOT NULL check ("PAUSED" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) '; execute immediate 'alter table "PEKKO_PROJECTION_MANAGEMENT" add constraint "PK_PROJECTION_MANAGEMENT_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") '; EXCEPTION WHEN OTHERS THEN IF SQLCODE = -955 THEN NULL; -- suppresses ORA-00955 exception ELSE RAISE; END IF; END;
- H2
-
source
CREATE TABLE IF NOT EXISTS "pekko_projection_offset_store" ( "projection_name" VARCHAR(255) NOT NULL, "projection_key" VARCHAR(255) NOT NULL, "current_offset" VARCHAR(255) NOT NULL, "manifest" VARCHAR(4) NOT NULL, "mergeable" BOOLEAN NOT NULL, "last_updated" BIGINT NOT NULL, PRIMARY KEY("projection_name", "projection_key") ); CREATE INDEX IF NOT EXISTS "projection_name_index" ON "pekko_projection_offset_store" ("projection_name"); CREATE TABLE IF NOT EXISTS "pekko_projection_management" ( "projection_name" VARCHAR(255) NOT NULL, "projection_key" VARCHAR(255) NOT NULL, "paused" BOOLEAN NOT NULL, "last_updated" BIGINT NOT NULL, PRIMARY KEY("projection_name", "projection_key") );
The schema can be created and dropped using the methods SlickProjection.createTablesIfNotExists
and SlickProjection.dropTablesIfExists
. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.
As of version 1.1.0, the schema for PostgreSQL and H2 databases has changed. It now defaults to lowercase table and column names. If you have a schema in production, we recommend applying an ALTER table script to change it accordingly.
Alternatively, you can fallback to the uppercase format. You will also need to set pekko.projection.slick.offset-store.table
as an uppercase value, as this setting is now defaulting to lowercase.
pekko.projection.slick.offset-store {
table = "PEKKO_PROJECTION_OFFSET_STORE"
use-lowercase-schema = false
}
Offset types
The supported offset types of the SlickProjection
are:
Offset
Offset
types from events from Apache Pekko PersistenceMergeableOffset
MergeableOffset
that is used for messages from KafkaString
Int
Long
- Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
Configuration
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
sourcepekko.projection.slick {
# The Slick profile to use
# set to one of:
# slick.jdbc.H2Profile$
# slick.jdbc.PostgresProfile$
# slick.jdbc.MySQLProfile$
# slick.jdbc.SQLServerProfile$
# slick.jdbc.OracleProfile$
#profile = <fill this with your profile of choice>
# add here your Slick db settings
db {
# url = "jdbc:h2:mem:test1"
# driver = org.h2.Driver
# connectionPool = disabled
# keepAliveConnection = true
}
offset-store {
# set this to your database schema if applicable, empty by default
schema = ""
# the database table name for the offset store
table = "pekko_projection_offset_store"
# the database table name for the projection management data
management-table = "pekko_projection_management"
# Use lowercase table and column names.
# This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive.
# Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
use-lowercase-schema = true
}
}