Projection

The R2dbcProjectionR2dbcProjection has support for storing the offset in a relational database using R2DBC.

The source of the envelopes is from a SourceProvider, which can be:

A R2dbcHandlerR2dbcHandler receives a R2dbcSessionR2dbcSession instance and an envelope. The R2dbcSession provides the means to access an open R2DBC connection that can be used to process the envelope. The target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.

Dependencies

To use the R2DBC module of Pekko Projections add the following dependency in your project:

Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-r2dbc_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-r2dbc" % "1.0.0"
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-r2dbc_${versions.ScalaBinary}:1.0.0"
}

Pekko Projections R2DBC depends on Pekko 1.0.2 or later, and note that it is important that all pekko-* dependencies are in the same version, so it is recommended to depend on them explicitly to avoid problems with transient dependencies causing an unlucky mix of versions.

Project Info: Pekko Projections with R2DBC
Artifact
org.apache.pekko
pekko-projection-r2dbc
1.0.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.12.19, 2.13.13, 3.3.3
JPMS module namepekko.projection.r2dbc
License
Forums
Release notesGithub releases
IssuesGithub issues
Sourceshttps://github.com/apache/incubator-pekko-persistence-r2dbc

Transitive dependencies

The table below shows pekko-projection-r2dbc’s direct dependencies, and the second tab shows all libraries it depends on transitively.

Schema

The projection_offset_store, projection_timestamp_offset_store and projection_management tables need to be created in the configured database, see schema definition in Creating the schema.

Configuration

By default, pekko-projection-r2dbc uses the same connection pool and dialect as pekko-persistence-r2dbc, see Connection configuration.

Reference configuration

The following can be overridden in your application.conf for the Projection specific settings:

sourcepekko.projection.r2dbc {
  # postgres or yugabyte
  dialect = ${pekko.persistence.r2dbc.dialect}

  offset-store {
    # set this to your database schema if applicable, empty by default
    schema = ""
    # the database table name for the offset store,
    # can be set to "" if only timestamp offsets are used and table is not created
    offset-table = "projection_offset_store"

    # the database table name for the offset store
    timestamp-offset-table = "projection_timestamp_offset_store"

    # the database table name for the projection manangement data
    management-table = "projection_management"

    # The offset store will keep track of persistence ids and sequence numbers
    # within this time window from latest offset.
    time-window = 5 minutes

    # Keep this number of entries. Don't evict old entries until this threshold
    # has been reached.
    keep-number-of-entries = 10000

    # Remove old entries outside the time-window from the offset store memory
    # with this frequency.
    evict-interval = 10 seconds

    # Remove old entries outside the time-window from the offset store database
    # with this frequency.
    delete-interval = 1 minute
  }

  # By default it shares connection-factory with pekko-persistence-r2dbc (write side),
  # i.e. same connection pool. To use a separate pool for projections this can be
  # set to another config path that defines the same kind of config as
  # pekko.persistence.r2dbc.connection-factory.
  use-connection-factory = "pekko.persistence.r2dbc.connection-factory"

  # Logs database calls that take longer than this duration at INFO level.
  # Set to "off" to disable this logging.
  # Set to 0 to log all calls.
  log-db-calls-exceeding = 300 ms
}

Running with Sharded Daemon Process

The Sharded Daemon Process can be used to distribute n instances of a given Projection across the cluster. Therefore, it’s important that each Projection instance consumes a subset of the stream of envelopes.

When using eventsBySlices the initialization code looks like this:

Scala
sourceimport org.apache.pekko
import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import pekko.projection.r2dbc.scaladsl.R2dbcProjection
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.projection.ProjectionId
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import pekko.projection.Projection
import pekko.projection.ProjectionBehavior
import pekko.projection.scaladsl.SourceProvider
import pekko.persistence.query.typed.EventEnvelope

def initProjections(): Unit = {
  def sourceProvider(sliceRange: Range): SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] =
    EventSourcedProvider
      .eventsBySlices[ShoppingCart.Event](
        system,
        readJournalPluginId = R2dbcReadJournal.Identifier,
        entityType,
        sliceRange.min,
        sliceRange.max)

  def projection(sliceRange: Range): Projection[EventEnvelope[ShoppingCart.Event]] = {
    val minSlice = sliceRange.min
    val maxSlice = sliceRange.max
    val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice")

    R2dbcProjection
      .exactlyOnce(
        projectionId,
        settings = None,
        sourceProvider(sliceRange),
        handler = () => new ShoppingCartHandler)
  }

  // Split the slices into 4 ranges
  val numberOfSliceRanges: Int = 4
  val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges)

  ShardedDaemonProcess(system).init(
    name = "ShoppingCartProjection",
    numberOfInstances = sliceRanges.size,
    behaviorFactory = i => ProjectionBehavior(projection(sliceRanges(i))),
    stopMessage = ProjectionBehavior.Stop)
}
Java
sourceimport org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
import org.apache.pekko.projection.ProjectionBehavior;
import org.apache.pekko.persistence.query.typed.EventEnvelope;
import org.apache.pekko.projection.Projection;
import org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings;
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection;

void initProjections() {
  // Split the slices into 4 ranges
  int numberOfSliceRanges = 4;
  List<Pair<Integer, Integer>> sliceRanges =
      EventSourcedProvider.sliceRanges(
          system, R2dbcReadJournal.Identifier(), numberOfSliceRanges);

  ShardedDaemonProcess.get(system)
      .init(
          ProjectionBehavior.Command.class,
          "ShoppingCartProjection",
          sliceRanges.size(),
          i -> ProjectionBehavior.create(createProjection(sliceRanges.get(i))),
          ProjectionBehavior.stopMessage());
}

Projection<EventEnvelope<ShoppingCart.Event>> createProjection(
    Pair<Integer, Integer> sliceRange) {
  int minSlice = sliceRange.first();
  int maxSlice = sliceRange.second();

  String entityType = ShoppingCart.ENTITY_TYPE_KEY.name();

  SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
      EventSourcedProvider.eventsBySlices(
          system, R2dbcReadJournal.Identifier(), entityType, minSlice, maxSlice);

  ProjectionId projectionId =
      ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
  Optional<R2dbcProjectionSettings> settings = Optional.empty();

  return R2dbcProjection.exactlyOnce(
      projectionId, settings, sourceProvider, ShoppingCartHandler::new, system);
}

The ShoppingCartHandler is shown below.

There are alternative ways of running the ProjectionBehavior as described in Running a Projection, but note that when using the R2DBC plugin as SourceProvider it is recommended to use eventsBySlices and not eventsByTag.

Slices

The SourceProvider for Event Sourced actors has historically been using eventsByTag but the R2DBC plugin is instead providing eventsBySlices as an improved solution.

The usage of eventsByTag for Projections has the drawback that the number of tags must be decided up-front and can’t easily be changed afterwards. Starting with too many tags means much overhead since many projection instances would be running on each node in a small Pekko Cluster. Each projection instance polling the database periodically. Starting with too few tags means that it can’t be scaled later to more Pekko nodes.

With eventsBySlices more Projection instances can be added when needed and still reuse the offsets for the previous slice distributions.

A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all persistence ids over the slices. The eventsBySlices query is for a range of the slices. For example if using 1024 slices and running 4 Projection instances the slice ranges would be 0-255, 256-511, 512-767, 768-1023. Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, …, 768-895, 896-1023.

However, when changing the number of slices the projections with the old slice distribution must be stopped before starting new projections. That can be done with a full shutdown before deploying the new slice distribution or pause (stop) the projections with the management API.

When using R2dbcProjection together with the EventSourcedProvider.eventsBySlices the events will be delivered in sequence number order without duplicates.

When using R2dbcProjection together with DurableStateSourceProvider.changesBySlices the changes will be delivered in revision number order without duplicates.

exactly-once

The offset is stored in the same transaction used for the user defined handler, which means exactly-once processing semantics if the projection is restarted from previously stored offset.

Scala
sourceimport org.apache.pekko
import pekko.projection.r2dbc.scaladsl.R2dbcProjection
import pekko.projection.ProjectionId

val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice")

val projection =
  R2dbcProjection
    .exactlyOnce(projectionId, settings = None, sourceProvider, handler = () => new ShoppingCartHandler)
Java
sourceimport org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings;
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection;

ProjectionId projectionId =
    ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);

Optional<R2dbcProjectionSettings> settings = Optional.empty();

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    R2dbcProjection.exactlyOnce(
        projectionId, settings, sourceProvider, ShoppingCartHandler::new, system);

The ShoppingCartHandler is shown below.

at-least-once

The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.

Scala
sourceimport org.apache.pekko
import pekko.projection.r2dbc.scaladsl.R2dbcProjection
import pekko.projection.ProjectionId

val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice")

val projection =
  R2dbcProjection
    .atLeastOnce(projectionId, settings = None, sourceProvider, handler = () => new ShoppingCartHandler)
    .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
Java
sourceimport org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings;
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection;

ProjectionId projectionId =
    ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);

Optional<R2dbcProjectionSettings> settings = Optional.empty();

int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    R2dbcProjection.atLeastOnce(
            projectionId, settings, sourceProvider, ShoppingCartHandler::new, system)
        .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);

The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset of the returned AtLeastOnceProjection. The default settings for the window is defined in configuration section pekko.projection.at-least-once. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.

The ShoppingCartHandler is shown below.

groupedWithin

The envelopes can be grouped before processing, which can be useful for batch updates.

Scala
sourceimport org.apache.pekko
import pekko.projection.r2dbc.scaladsl.R2dbcProjection
import pekko.projection.ProjectionId

val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice")

val projection =
  R2dbcProjection
    .groupedWithin(projectionId, settings = None, sourceProvider, handler = () => new GroupedShoppingCartHandler)
    .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
Java
sourceimport org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings;
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection;

ProjectionId projectionId =
    ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);

Optional<R2dbcProjectionSettings> settings = Optional.empty();

int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    R2dbcProjection.groupedWithin(
            projectionId, settings, sourceProvider, GroupedShoppingCartHandler::new, system)
        .withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);

The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup of the returned GroupedProjection. The default settings for the window is defined in configuration section pekko.projection.grouped.

When using groupedWithin the handler is a R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>>. The GroupedShoppingCartHandler is shown below.

The offset is stored in the same transaction used for the user defined handler, which means exactly-once processing semantics if the projection is restarted from previously stored offset.

Handler

It’s in the R2dbcHandlerR2dbcHandler that you implement the processing of each envelope. It’s essentially a consumer function from (R2dbcSession, Envelope) to Future[Done]CompletionStage<Done>.

A handler that is consuming ShoppingCart.Event from eventsBySlices can look like this:

Scala
sourceimport org.apache.pekko
import pekko.projection.r2dbc.scaladsl.R2dbcHandler
import pekko.projection.r2dbc.scaladsl.R2dbcSession
import pekko.persistence.query.typed.EventEnvelope

class ShoppingCartHandler()(implicit ec: ExecutionContext) extends R2dbcHandler[EventEnvelope[ShoppingCart.Event]] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(session: R2dbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
    envelope.event match {
      case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info(s"Shopping cart $cartId was checked out at $time")
        val stmt = session
          .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
          .bind(0, cartId)
          .bind(1, time)
        session
          .updateOne(stmt)
          .map(_ => Done)

      case otherEvent =>
        logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
        Future.successful(Done)
    }
  }
}
Java
sourceimport org.apache.pekko.projection.r2dbc.javadsl.R2dbcHandler;
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcSession;
import io.r2dbc.spi.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class ShoppingCartHandler extends R2dbcHandler<EventEnvelope<ShoppingCart.Event>> {
  private final Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public CompletionStage<Done> process(
      R2dbcSession session, EventEnvelope<ShoppingCart.Event> envelope) {
    ShoppingCart.Event event = envelope.event();
    if (event instanceof ShoppingCart.CheckedOut) {
      ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
      logger.info(
          "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);

      Statement stmt =
          session
              .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
              .bind(0, checkedOut.cartId)
              .bind(1, checkedOut.eventTime);
      return session.updateOne(stmt).thenApply(rowsUpdated -> Done.getInstance());

    } else {
      logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
      return CompletableFuture.completedFuture(Done.getInstance());
    }
  }
}
Hint

Such simple handlers can also be defined as plain functions via the helper R2dbcHandler.applyR2dbcHandler.fromFunction factory method.

Grouped handler

When using R2dbcProjection.groupedWithin the handler is processing a SeqList of envelopes.

Scala
sourceimport org.apache.pekko
import pekko.projection.r2dbc.scaladsl.R2dbcHandler
import pekko.projection.r2dbc.scaladsl.R2dbcSession
import pekko.persistence.query.typed.EventEnvelope

import scala.collection.immutable

class GroupedShoppingCartHandler()(implicit ec: ExecutionContext)
    extends R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(
      session: R2dbcSession,
      envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Future[Done] = {

    // save all events in DB
    val stmts = envelopes
      .map(_.event)
      .collect { case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info(s"Shopping cart $cartId was checked out at $time")

        session
          .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
          .bind(0, cartId)
          .bind(1, time)

      }
      .toVector

    session.update(stmts).map(_ => Done)
  }
}
Java
sourceimport org.apache.pekko.projection.r2dbc.javadsl.R2dbcHandler;
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcSession;
import io.r2dbc.spi.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class GroupedShoppingCartHandler
    extends R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>> {
  private final Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public CompletionStage<Done> process(
      R2dbcSession session, List<EventEnvelope<ShoppingCart.Event>> envelopes) {
    List<Statement> stmts = new ArrayList<>();
    for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) {
      ShoppingCart.Event event = envelope.event();
      if (event instanceof ShoppingCart.CheckedOut) {
        ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
        logger.info(
            "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);

        Statement stmt =
            session
                .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
                .bind(0, checkedOut.cartId)
                .bind(1, checkedOut.eventTime);
        stmts.add(stmt);
      } else {
        logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
      }
    }

    return session.update(stmts).thenApply(rowsUpdated -> Done.getInstance());
  }
}

Stateful handler

The R2dbcHandlerR2dbcHandler can be stateful, with variables and mutable data structures. It is invoked by the Projection machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process.

Note

It is important that the Handler instance is not shared between several Projection instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection instance should use a new Handler instance.

Async handler

The HandlerHandler can be used with R2dbcProjection.atLeastOnceAsync and R2dbcProjection.groupedWithinAsync if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.

There are several examples of such Handler in the documentation for Cassandra Projections. Same type of handlers can be used with R2dbcProjection instead of CassandraProjection.

Actor handler

A good alternative for advanced state management is to implement the handler as an actor which is described in Processing with Actor.

Flow handler

A Pekko Streams FlowWithContext can be used instead of a handler for processing the envelopes, which is described in Processing with Pekko Streams.

Handler lifecycle

You can override the start and stop methods of the R2dbcHandler to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection is restarted after failure.

See also error handling.

Offset types

The supported offset types of the R2dbcProjection are:

Publish events for lower latency

To reduce the latency until the Projection finds and process new events you can enable the feature described in eventsBySlices documentation.