Offset in a relational DB with Slick

The SlickProjectionSlickProjection has support for storing the offset in a relational database using Slick (JDBC). This is only an option for Scala and for Java the offset can be stored in relational DB with JDBC. The JDBC module can also be used with Scala.

Warning

Use the JDBC module to implement your projection handler. This module is deprecated.

The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider with supported offset types.

The envelope handler returns a DBIO that will be run by the projection. This means that the target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.

Dependencies

To use the Slick module of Apache Pekko Projections add the following dependency in your project:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-slick" % "1.0.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-slick_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-slick_${versions.ScalaBinary}:1.0.0"
}

Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.

Project Info: Apache Pekko Projections Slick
Artifact
org.apache.pekko
pekko-projection-slick
1.0.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.13, 2.12.19
JPMS module namepekko.projection.slick
License
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/apache/pekko-projection

Transitive dependencies

The table below shows pekko-projection-slick’s direct dependencies and the second tab shows all libraries it depends on transitively.

exactly-once

The offset is stored in the same transaction as the DBIO returned from the handler, which means exactly-once processing semantics if the projection is restarted from previously stored offset.

Scala
sourceimport org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.slick.SlickProjection
import slick.basic.DatabaseConfig
import slick.dbio.DBIO
import slick.jdbc.H2Profile

implicit val ec: ExecutionContext = system.executionContext

val projection =
  SlickProjection.exactlyOnce(
    projectionId = ProjectionId("ShoppingCarts", "carts-1"),
    sourceProvider,
    dbConfig,
    handler = () => new ShoppingCartHandler(repository))

The ShoppingCartHandler is shown below.

at-least-once

The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.

Scala
sourceimplicit val ec: ExecutionContext = system.executionContext

val projection =
  SlickProjection
    .atLeastOnce(
      projectionId = ProjectionId("ShoppingCarts", "carts-1"),
      sourceProvider,
      dbConfig,
      handler = () => new ShoppingCartHandler(repository))
    .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)

The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset of the returned AtLeastOnceProjection. The default settings for the window is defined in configuration section pekko.projection.at-least-once. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.

The ShoppingCartHandler is shown below.

groupedWithin

The envelopes can be grouped before processing, which can be useful for batch updates.

Scala
sourceimplicit val ec: ExecutionContext = system.executionContext

val projection =
  SlickProjection
    .groupedWithin(
      projectionId = ProjectionId("ShoppingCarts", "carts-1"),
      sourceProvider,
      dbConfig,
      handler = () => new GroupedShoppingCartHandler(repository))
    .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)

The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup of the returned GroupedProjection. The default settings for the window is defined in configuration section pekko.projection.grouped.

When using groupedWithin the handler is a SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]. The GroupedShoppingCartHandler is shown below.

The offset is stored in the same transaction as the DBIO returned from the handler, which means exactly-once processing semantics if the projection is restarted from previously stored offset.

Handler

It’s in the SlickHandlerSlickHandler that you implement the processing of each envelope. It’s essentially a function from Envelope to DBIO[Done]. The returned DBIO is run by the projection.

A handler that is consuming ShoppingCart.Event from eventsByTag can look like this:

Scala
sourceimport scala.concurrent.Future

import org.apache.pekko
import pekko.Done
import pekko.projection.slick.SlickHandler
import org.slf4j.LoggerFactory

class ShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext)
    extends SlickHandler[EventEnvelope[ShoppingCart.Event]] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(envelope: EventEnvelope[ShoppingCart.Event]): DBIO[Done] = {
    envelope.event match {
      case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info(s"Shopping cart $cartId was checked out at $time")
        repository.save(Order(cartId, time))

      case otherEvent =>
        logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
        DBIO.successful(Done)
    }
  }
}
Hint

Such simple handlers can also be defined as plain functions via the helper SlickHandler.apply factory method.

where the OrderRepository is:

Scala
sourcecase class Order(id: String, time: Instant)

class OrderRepository(val dbConfig: DatabaseConfig[H2Profile]) {

  import dbConfig.profile.api._

  private class OrdersTable(tag: Tag) extends Table[Order](tag, "ORDERS") {
    def id = column[String]("CART_ID", O.PrimaryKey)

    def time = column[Instant]("TIME")

    def * = (id, time).mapTo[Order]
  }

  private val ordersTable = TableQuery[OrdersTable]

  def save(order: Order)(implicit ec: ExecutionContext) = {
    ordersTable.insertOrUpdate(order).map(_ => Done)
  }

  def createTable(): Future[Unit] =
    dbConfig.db.run(ordersTable.schema.createIfNotExists)
}

with the Slick DatabaseConfig:

Scala
sourceval dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig("pekko.projection.slick", system.settings.config)

val repository = new OrderRepository(dbConfig)

Grouped handler

When using SlickProjection.groupedWithin the handler is processing a Seq of envelopes.

Scala
sourceimport scala.collection.immutable

class GroupedShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext)
    extends SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): DBIO[Done] = {
    val dbios = envelopes.map(_.event).map {
      case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info(s"Shopping cart $cartId was checked out at $time")
        repository.save(Order(cartId, time))

      case otherEvent =>
        logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
        DBIO.successful(Done)
    }
    DBIO.sequence(dbios).map(_ => Done)
  }
}

Stateful handler

The SlickHandler can be stateful, with variables and mutable data structures. It is invoked by the Projection machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process.

Note

It is important that the Handler instance is not shared between several Projection instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection instance should use a new Handler instance.

Async handler

The HandlerHandler can be used with SlickProjection.atLeastOnceAsync and SlickProjection.groupedWithinAsync if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.

There are several examples of such Handler in the documentation for Cassandra Projections. Same type of handlers can be used with SlickProjection instead of CassandraProjection.

Actor handler

A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.

Flow handler

An Apache Pekko Streams FlowWithContext can be used instead of a handler for processing the envelopes, which is described in Processing with Apache Pekko Streams.

Handler lifecycle

You can override the start and stop methods of the SlickHandlerSlickHandler to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection is restarted after failure.

See also error handling.

Schema

The database schema for the offset storage table:

PostgreSQL
sourceCREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  current_offset VARCHAR(255) NOT NULL,
  manifest VARCHAR(4) NOT NULL,
  mergeable BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);

CREATE INDEX IF NOT EXISTS projection_name_index ON pekko_projection_offset_store (projection_name);

CREATE TABLE IF NOT EXISTS pekko_projection_management (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  paused BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);
MySQL
sourceCREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  current_offset VARCHAR(255) NOT NULL,
  manifest VARCHAR(4) NOT NULL,
  mergeable BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);

CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name);

CREATE TABLE IF NOT EXISTS pekko_projection_management (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  paused BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);
Microsoft SQL Server
sourceIF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_offset_store') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE pekko_projection_offset_store (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  current_offset VARCHAR(255) NOT NULL,
  manifest VARCHAR(4) NOT NULL,
  mergeable BIT NOT NULL,
  last_updated BIGINT NOT NULL
)

ALTER TABLE pekko_projection_offset_store ADD CONSTRAINT pk_projection_id PRIMARY KEY(projection_name, projection_key)

CREATE INDEX projection_name_index ON pekko_projection_offset_store (projection_name)
END

IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'pekko_projection_management') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE pekko_projection_management (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  paused BIT NOT NULL,
  last_updated BIGINT NOT NULL
)

ALTER TABLE pekko_projection_management ADD CONSTRAINT pk_projection_management_id PRIMARY KEY(projection_name, projection_key)
END
Oracle
sourceBEGIN
execute immediate 'create table "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"CURRENT_OFFSET" VARCHAR2(255) NOT NULL,"MANIFEST" VARCHAR2(4) NOT NULL,"MERGEABLE" CHAR(1) NOT NULL check ("MERGEABLE" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "PEKKO_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
execute immediate 'create index "PROJECTION_NAME_INDEX" on "PEKKO_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") ';
EXCEPTION
    WHEN OTHERS THEN
      IF SQLCODE = -955 THEN
        NULL; -- suppresses ORA-00955 exception
      ELSE
         RAISE;
      END IF;
END;

BEGIN
execute immediate 'create table "PEKKO_PROJECTION_MANAGEMENT" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"PAUSED" CHAR(1) NOT NULL check ("PAUSED" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "PEKKO_PROJECTION_MANAGEMENT" add constraint "PK_PROJECTION_MANAGEMENT_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
EXCEPTION
    WHEN OTHERS THEN
      IF SQLCODE = -955 THEN
        NULL; -- suppresses ORA-00955 exception
      ELSE
         RAISE;
      END IF;
END;
H2
sourceCREATE TABLE IF NOT EXISTS "pekko_projection_offset_store" (
  "projection_name" VARCHAR(255) NOT NULL,
  "projection_key" VARCHAR(255) NOT NULL,
  "current_offset" VARCHAR(255) NOT NULL,
  "manifest" VARCHAR(4) NOT NULL,
  "mergeable" BOOLEAN NOT NULL,
  "last_updated" BIGINT NOT NULL,
  PRIMARY KEY("projection_name", "projection_key")
);

CREATE INDEX IF NOT EXISTS "projection_name_index" ON "pekko_projection_offset_store" ("projection_name");

CREATE TABLE IF NOT EXISTS "pekko_projection_management" (
  "projection_name" VARCHAR(255) NOT NULL,
  "projection_key" VARCHAR(255) NOT NULL,
  "paused" BOOLEAN NOT NULL,
  "last_updated" BIGINT NOT NULL,
  PRIMARY KEY("projection_name", "projection_key")
);

The schema can be created and dropped using the methods SlickProjection.createTablesIfNotExists and SlickProjection.dropTablesIfExists. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.

Important

As of version 1.1.0, the schema for PostgreSQL and H2 databases has changed. It now defaults to lowercase table and column names. If you have a schema in production, we recommend applying an ALTER table script to change it accordingly.

Alternatively, you can fallback to the uppercase format. You will also need to set pekko.projection.slick.offset-store.table as an uppercase value, as this setting is now defaulting to lowercase.

pekko.projection.slick.offset-store {
  table = "PEKKO_PROJECTION_OFFSET_STORE"
  use-lowercase-schema = false
}

Offset types

The supported offset types of the SlickProjection are:

Configuration

Make your edits/overrides in your application.conf.

The reference configuration file with the default values:

sourcepekko.projection.slick {

  # The Slick profile to use
  # set to one of: 
  #  slick.jdbc.H2Profile$
  #  slick.jdbc.PostgresProfile$
  #  slick.jdbc.MySQLProfile$
  #  slick.jdbc.SQLServerProfile$ 
  #  slick.jdbc.OracleProfile$
  #profile = <fill this with your profile of choice>

  # add here your Slick db settings
  db {
    # url = "jdbc:h2:mem:test1"
    # driver = org.h2.Driver
    # connectionPool = disabled
    # keepAliveConnection = true
  }

  offset-store {
    # set this to your database schema if applicable, empty by default
    schema = ""
    # the database table name for the offset store
    table = "pekko_projection_offset_store"

    # the database table name for the projection management data
    management-table = "pekko_projection_management"

    # Use lowercase table and column names. 
    # This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive. 
    # Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
    use-lowercase-schema = true
  }
}