Offset in a relational DB with Slick
The SlickProjection
has support for storing the offset in a relational database using Slick (JDBC). This is only an option for Scala and for Java the offset can be stored in relational DB with JDBC. The JDBC module can also be used with Scala.
Use the JDBC module to implement your projection handler. This module is deprecated.
The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider
with supported offset types.
The envelope handler returns a DBIO
that will be run by the projection. This means that the target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.
Dependencies¶
To use the Slick module of Apache Pekko Projections add the following dependency in your project:
libraryDependencies += "org.apache.pekko" %% "pekko-projection-slick" % "1.1.0"
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-projection-slick_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-projection-slick_${versions.ScalaBinary}:1.1.0"
}
Apache Pekko Projections require Pekko 1.1.3 or later, see Pekko version.
Project Info: Apache Pekko Projections Slick | |
---|---|
Artifact | org.apache.pekko
pekko-projection-slick
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.16, 2.12.20, 3.3.5 |
JPMS module name | pekko.projection.slick |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies¶
The table below shows pekko-projection-slick
’s direct dependencies and the second tab shows all libraries it depends on transitively.
exactly-once¶
The offset is stored in the same transaction as the DBIO
returned from the handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.slick.SlickProjection
import slick.basic.DatabaseConfig
import slick.dbio.DBIO
import slick.jdbc.H2Profile
implicit val ec: ExecutionContext = system.executionContext
val projection =
SlickProjection.exactlyOnce(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
dbConfig,
handler = () => new ShoppingCartHandler(repository))
The ShoppingCartHandler
is shown below.
at-least-once¶
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
sourceimplicit val ec: ExecutionContext = system.executionContext
val projection =
SlickProjection
.atLeastOnce(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
dbConfig,
handler = () => new ShoppingCartHandler(repository))
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section pekko.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
groupedWithin¶
The envelopes can be grouped before processing, which can be useful for batch updates.
sourceimplicit val ec: ExecutionContext = system.executionContext
val projection =
SlickProjection
.groupedWithin(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
dbConfig,
handler = () => new GroupedShoppingCartHandler(repository))
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section pekko.projection.grouped
.
When using groupedWithin
the handler is a SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]
. The GroupedShoppingCartHandler
is shown below.
The offset is stored in the same transaction as the DBIO
returned from the handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
Handler¶
It’s in the SlickHandler
that you implement the processing of each envelope. It’s essentially a function from Envelope
to DBIO[Done]
. The returned DBIO
is run by the projection.
A handler that is consuming ShoppingCart.Event
from eventsByTag
can look like this:
sourceimport scala.concurrent.Future
import org.apache.pekko
import pekko.Done
import pekko.projection.slick.SlickHandler
import org.slf4j.LoggerFactory
class ShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext)
extends SlickHandler[EventEnvelope[ShoppingCart.Event]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelope: EventEnvelope[ShoppingCart.Event]): DBIO[Done] = {
envelope.event match {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info(s"Shopping cart $cartId was checked out at $time")
repository.save(Order(cartId, time))
case otherEvent =>
logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
DBIO.successful(Done)
}
}
}
Such simple handlers can also be defined as plain functions via the helper SlickHandler.apply
factory method.
where the OrderRepository
is:
sourcecase class Order(id: String, time: Instant)
class OrderRepository(val dbConfig: DatabaseConfig[H2Profile]) {
import dbConfig.profile.api._
private class OrdersTable(tag: Tag) extends Table[Order](tag, "ORDERS") {
def id = column[String]("CART_ID", O.PrimaryKey)
def time = column[Instant]("TIME")
def * = (id, time).mapTo[Order]
}
private val ordersTable = TableQuery[OrdersTable]
def save(order: Order)(implicit ec: ExecutionContext) = {
ordersTable.insertOrUpdate(order).map(_ => Done)
}
def createTable(): Future[Unit] =
dbConfig.db.run(ordersTable.schema.createIfNotExists)
}
with the Slick DatabaseConfig
:
sourceval dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig("pekko.projection.slick", system.settings.config)
val repository = new OrderRepository(dbConfig)
Grouped handler¶
When using SlickProjection.groupedWithin
the handler is processing a Seq
of envelopes.
sourceimport scala.collection.immutable
class GroupedShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext)
extends SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): DBIO[Done] = {
val dbios = envelopes.map(_.event).map {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info(s"Shopping cart $cartId was checked out at $time")
repository.save(Order(cartId, time))
case otherEvent =>
logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
DBIO.successful(Done)
}
DBIO.sequence(dbios).map(_ => Done)
}
}
Stateful handler¶
The SlickHandler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process
.
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance.
Async handler¶
The Handler
can be used with SlickProjection.atLeastOnceAsync
and SlickProjection.groupedWithinAsync
if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.
There are several examples of such Handler
in the documentation for Cassandra Projections. Same type of handlers can be used with SlickProjection
instead of CassandraProjection
.
Actor handler¶
A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.
Flow handler¶
An Apache Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.
Handler lifecycle¶
You can override the start
and stop
methods of the SlickHandler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Schema¶
The database schema for the offset storage table:
sourceCREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(4) NOT NULL,
mergeable BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
CREATE INDEX IF NOT EXISTS projection_name_index ON pekko_projection_offset_store (projection_name);
CREATE TABLE IF NOT EXISTS pekko_projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
sourceCREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(4) NOT NULL,
mergeable BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name);
CREATE TABLE IF NOT EXISTS pekko_projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
sourceIF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_offset_store') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE pekko_projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(4) NOT NULL,
mergeable BIT NOT NULL,
last_updated BIGINT NOT NULL
)
ALTER TABLE pekko_projection_offset_store ADD CONSTRAINT pk_projection_id PRIMARY KEY(projection_name, projection_key)
CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name)
END
IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_management') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE pekko_projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BIT NOT NULL,
last_updated BIGINT NOT NULL
)
ALTER TABLE pekko_projection_management ADD CONSTRAINT pk_projection_management_id PRIMARY KEY(projection_name, projection_key)
END
sourceBEGIN
execute immediate 'create table "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"CURRENT_OFFSET" VARCHAR2(255) NOT NULL,"MANIFEST" VARCHAR2(4) NOT NULL,"MERGEABLE" CHAR(1) NOT NULL check ("MERGEABLE" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "PEKKO_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
execute immediate 'create index "PROJECTION_NAME_INDEX" on "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") ';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -955 THEN
NULL; -- suppresses ORA-00955 exception
ELSE
RAISE;
END IF;
END;
BEGIN
execute immediate 'create table "PEKKO_PROJECTION_MANAGEMENT" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"PAUSED" CHAR(1) NOT NULL check ("PAUSED" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "PEKKO_PROJECTION_MANAGEMENT" add constraint "PK_PROJECTION_MANAGEMENT_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -955 THEN
NULL; -- suppresses ORA-00955 exception
ELSE
RAISE;
END IF;
END;
sourceCREATE TABLE IF NOT EXISTS "pekko_projection_offset_store" (
"projection_name" VARCHAR(255) NOT NULL,
"projection_key" VARCHAR(255) NOT NULL,
"current_offset" VARCHAR(255) NOT NULL,
"manifest" VARCHAR(4) NOT NULL,
"mergeable" BOOLEAN NOT NULL,
"last_updated" BIGINT NOT NULL,
PRIMARY KEY("projection_name", "projection_key")
);
CREATE INDEX IF NOT EXISTS "projection_name_index" ON "pekko_projection_offset_store" ("projection_name");
CREATE TABLE IF NOT EXISTS "pekko_projection_management" (
"projection_name" VARCHAR(255) NOT NULL,
"projection_key" VARCHAR(255) NOT NULL,
"paused" BOOLEAN NOT NULL,
"last_updated" BIGINT NOT NULL,
PRIMARY KEY("projection_name", "projection_key")
);
The schema can be created and dropped using the methods SlickProjection.createTablesIfNotExists
and SlickProjection.dropTablesIfExists
. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.
As of version 1.1.0, the schema for PostgreSQL and H2 databases has changed. It now defaults to lowercase table and column names. If you have a schema in production, we recommend applying an ALTER table script to change it accordingly.
Alternatively, you can fallback to the uppercase format. You will also need to set pekko.projection.slick.offset-store.table
as an uppercase value, as this setting is now defaulting to lowercase.
pekko.projection.slick.offset-store {
table = "PEKKO_PROJECTION_OFFSET_STORE"
use-lowercase-schema = false
}
Offset types¶
The supported offset types of the SlickProjection
are:
Offset
types from events from Apache Pekko PersistenceMergeableOffset
that is used for messages from KafkaString
Int
Long
- Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
Configuration¶
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
sourcepekko.projection.slick {
# The Slick profile to use
# set to one of:
# slick.jdbc.H2Profile$
# slick.jdbc.PostgresProfile$
# slick.jdbc.MySQLProfile$
# slick.jdbc.SQLServerProfile$
# slick.jdbc.OracleProfile$
#profile = <fill this with your profile of choice>
# add here your Slick db settings
db {
# url = "jdbc:h2:mem:test1"
# driver = org.h2.Driver
# connectionPool = disabled
# keepAliveConnection = true
}
offset-store {
# set this to your database schema if applicable, empty by default
schema = ""
# the database table name for the offset store
table = "pekko_projection_offset_store"
# the database table name for the projection management data
management-table = "pekko_projection_management"
# Use lowercase table and column names.
# This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive.
# Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
use-lowercase-schema = true
}
}