Projection
The R2dbcProjection
R2dbcProjection
has support for storing the offset in a relational database using R2DBC.
The source of the envelopes is from a SourceProvider
, which can be:
- events from Event Sourced entities via the SourceProvider for eventsBySlices with the eventsBySlices query
- state changes for Durable State entities via the SourceProvider for changesBySlices with the changesBySlices query
- any other
SourceProvider
with supported offset types
A R2dbcHandler
R2dbcHandler
receives a R2dbcSession
R2dbcSession
instance and an envelope. The R2dbcSession
provides the means to access an open R2DBC connection that can be used to process the envelope. The target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.
Dependencies
To use the R2DBC module of Pekko Projections add the following dependency in your project:
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-r2dbc_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-r2dbc" % "1.0.0"
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-r2dbc_${versions.ScalaBinary}:1.0.0" }
Pekko Projections R2DBC depends on Pekko 1.0.2 or later, and note that it is important that all pekko-*
dependencies are in the same version, so it is recommended to depend on them explicitly to avoid problems with transient dependencies causing an unlucky mix of versions.
Project Info: Pekko Projections with R2DBC | |
---|---|
Artifact | org.apache.pekko
pekko-projection-r2dbc
1.0.0 |
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.12.19, 2.13.13, 3.3.3 |
JPMS module name | pekko.projection.r2dbc |
License | |
Forums | |
Release notes | Github releases |
Issues | Github issues |
Sources | https://github.com/apache/incubator-pekko-persistence-r2dbc |
Transitive dependencies
The table below shows pekko-projection-r2dbc
’s direct dependencies, and the second tab shows all libraries it depends on transitively.
Schema
The projection_offset_store
, projection_timestamp_offset_store
and projection_management
tables need to be created in the configured database, see schema definition in Creating the schema.
Configuration
By default, pekko-projection-r2dbc
uses the same connection pool and dialect
as pekko-persistence-r2dbc
, see Connection configuration.
Reference configuration
The following can be overridden in your application.conf
for the Projection specific settings:
sourcepekko.projection.r2dbc {
# postgres or yugabyte
dialect = ${pekko.persistence.r2dbc.dialect}
offset-store {
# set this to your database schema if applicable, empty by default
schema = ""
# the database table name for the offset store,
# can be set to "" if only timestamp offsets are used and table is not created
offset-table = "projection_offset_store"
# the database table name for the offset store
timestamp-offset-table = "projection_timestamp_offset_store"
# the database table name for the projection manangement data
management-table = "projection_management"
# The offset store will keep track of persistence ids and sequence numbers
# within this time window from latest offset.
time-window = 5 minutes
# Keep this number of entries. Don't evict old entries until this threshold
# has been reached.
keep-number-of-entries = 10000
# Remove old entries outside the time-window from the offset store memory
# with this frequency.
evict-interval = 10 seconds
# Remove old entries outside the time-window from the offset store database
# with this frequency.
delete-interval = 1 minute
}
# By default it shares connection-factory with pekko-persistence-r2dbc (write side),
# i.e. same connection pool. To use a separate pool for projections this can be
# set to another config path that defines the same kind of config as
# pekko.persistence.r2dbc.connection-factory.
use-connection-factory = "pekko.persistence.r2dbc.connection-factory"
# Logs database calls that take longer than this duration at INFO level.
# Set to "off" to disable this logging.
# Set to 0 to log all calls.
log-db-calls-exceeding = 300 ms
}
Running with Sharded Daemon Process
The Sharded Daemon Process can be used to distribute n
instances of a given Projection across the cluster. Therefore, it’s important that each Projection instance consumes a subset of the stream of envelopes.
When using eventsBySlices
the initialization code looks like this:
- Scala
-
source
import org.apache.pekko import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import pekko.projection.r2dbc.scaladsl.R2dbcProjection import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import pekko.projection.ProjectionId import pekko.projection.eventsourced.scaladsl.EventSourcedProvider import pekko.projection.Projection import pekko.projection.ProjectionBehavior import pekko.projection.scaladsl.SourceProvider import pekko.persistence.query.typed.EventEnvelope def initProjections(): Unit = { def sourceProvider(sliceRange: Range): SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsBySlices[ShoppingCart.Event]( system, readJournalPluginId = R2dbcReadJournal.Identifier, entityType, sliceRange.min, sliceRange.max) def projection(sliceRange: Range): Projection[EventEnvelope[ShoppingCart.Event]] = { val minSlice = sliceRange.min val maxSlice = sliceRange.max val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice") R2dbcProjection .exactlyOnce( projectionId, settings = None, sourceProvider(sliceRange), handler = () => new ShoppingCartHandler) } // Split the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges) ShardedDaemonProcess(system).init( name = "ShoppingCartProjection", numberOfInstances = sliceRanges.size, behaviorFactory = i => ProjectionBehavior(projection(sliceRanges(i))), stopMessage = ProjectionBehavior.Stop) }
- Java
-
source
import org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import org.apache.pekko.projection.ProjectionBehavior; import org.apache.pekko.persistence.query.typed.EventEnvelope; import org.apache.pekko.projection.Projection; import org.apache.pekko.projection.ProjectionId; import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings; import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection; void initProjections() { // Split the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), numberOfSliceRanges); ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "ShoppingCartProjection", sliceRanges.size(), i -> ProjectionBehavior.create(createProjection(sliceRanges.get(i))), ProjectionBehavior.stopMessage()); } Projection<EventEnvelope<ShoppingCart.Event>> createProjection( Pair<Integer, Integer> sliceRange) { int minSlice = sliceRange.first(); int maxSlice = sliceRange.second(); String entityType = ShoppingCart.ENTITY_TYPE_KEY.name(); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), entityType, minSlice, maxSlice); ProjectionId projectionId = ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice); Optional<R2dbcProjectionSettings> settings = Optional.empty(); return R2dbcProjection.exactlyOnce( projectionId, settings, sourceProvider, ShoppingCartHandler::new, system); }
The ShoppingCartHandler
is shown below.
There are alternative ways of running the ProjectionBehavior
as described in Running a Projection, but note that when using the R2DBC plugin as SourceProvider
it is recommended to use eventsBySlices
and not eventsByTag
.
Slices
The SourceProvider
for Event Sourced actors has historically been using eventsByTag
but the R2DBC plugin is instead providing eventsBySlices
as an improved solution.
The usage of eventsByTag
for Projections has the drawback that the number of tags must be decided up-front and can’t easily be changed afterwards. Starting with too many tags means much overhead since many projection instances would be running on each node in a small Pekko Cluster. Each projection instance polling the database periodically. Starting with too few tags means that it can’t be scaled later to more Pekko nodes.
With eventsBySlices
more Projection instances can be added when needed and still reuse the offsets for the previous slice distributions.
A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all persistence ids over the slices. The eventsBySlices
query is for a range of the slices. For example if using 1024 slices and running 4 Projection instances the slice ranges would be 0-255, 256-511, 512-767, 768-1023. Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, …, 768-895, 896-1023.
However, when changing the number of slices the projections with the old slice distribution must be stopped before starting new projections. That can be done with a full shutdown before deploying the new slice distribution or pause (stop) the projections with the management API.
When using R2dbcProjection
together with the EventSourcedProvider.eventsBySlices
the events will be delivered in sequence number order without duplicates.
When using R2dbcProjection
together with DurableStateSourceProvider.changesBySlices
the changes will be delivered in revision number order without duplicates.
exactly-once
The offset is stored in the same transaction used for the user defined handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
- Scala
-
source
import org.apache.pekko import pekko.projection.r2dbc.scaladsl.R2dbcProjection import pekko.projection.ProjectionId val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice") val projection = R2dbcProjection .exactlyOnce(projectionId, settings = None, sourceProvider, handler = () => new ShoppingCartHandler)
- Java
-
source
import org.apache.pekko.projection.ProjectionId; import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings; import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection; ProjectionId projectionId = ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice); Optional<R2dbcProjectionSettings> settings = Optional.empty(); Projection<EventEnvelope<ShoppingCart.Event>> projection = R2dbcProjection.exactlyOnce( projectionId, settings, sourceProvider, ShoppingCartHandler::new, system);
The ShoppingCartHandler
is shown below.
at-least-once
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
- Scala
-
source
import org.apache.pekko import pekko.projection.r2dbc.scaladsl.R2dbcProjection import pekko.projection.ProjectionId val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice") val projection = R2dbcProjection .atLeastOnce(projectionId, settings = None, sourceProvider, handler = () => new ShoppingCartHandler) .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
- Java
-
source
import org.apache.pekko.projection.ProjectionId; import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings; import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection; ProjectionId projectionId = ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice); Optional<R2dbcProjectionSettings> settings = Optional.empty(); int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection = R2dbcProjection.atLeastOnce( projectionId, settings, sourceProvider, ShoppingCartHandler::new, system) .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section pekko.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
groupedWithin
The envelopes can be grouped before processing, which can be useful for batch updates.
- Scala
-
source
import org.apache.pekko import pekko.projection.r2dbc.scaladsl.R2dbcProjection import pekko.projection.ProjectionId val projectionId = ProjectionId("ShoppingCarts", s"carts-$minSlice-$maxSlice") val projection = R2dbcProjection .groupedWithin(projectionId, settings = None, sourceProvider, handler = () => new GroupedShoppingCartHandler) .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
- Java
-
source
import org.apache.pekko.projection.ProjectionId; import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings; import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection; ProjectionId projectionId = ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice); Optional<R2dbcProjectionSettings> settings = Optional.empty(); int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection = R2dbcProjection.groupedWithin( projectionId, settings, sourceProvider, GroupedShoppingCartHandler::new, system) .withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section pekko.projection.grouped
.
When using groupedWithin
the handler is a R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]
R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>>
. The GroupedShoppingCartHandler
is shown below.
The offset is stored in the same transaction used for the user defined handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
Handler
It’s in the R2dbcHandler
R2dbcHandler
that you implement the processing of each envelope. It’s essentially a consumer function from (R2dbcSession, Envelope)
to Future[Done]
CompletionStage<Done>
.
A handler that is consuming ShoppingCart.Event
from eventsBySlices
can look like this:
- Scala
-
source
import org.apache.pekko import pekko.projection.r2dbc.scaladsl.R2dbcHandler import pekko.projection.r2dbc.scaladsl.R2dbcSession import pekko.persistence.query.typed.EventEnvelope class ShoppingCartHandler()(implicit ec: ExecutionContext) extends R2dbcHandler[EventEnvelope[ShoppingCart.Event]] { private val logger = LoggerFactory.getLogger(getClass) override def process(session: R2dbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { envelope.event match { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") val stmt = session .createStatement("INSERT into order (id, time) VALUES ($1, $2)") .bind(0, cartId) .bind(1, time) session .updateOne(stmt) .map(_ => Done) case otherEvent => logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") Future.successful(Done) } } }
- Java
-
source
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcHandler; import org.apache.pekko.projection.r2dbc.javadsl.R2dbcSession; import io.r2dbc.spi.Statement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class ShoppingCartHandler extends R2dbcHandler<EventEnvelope<ShoppingCart.Event>> { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<Done> process( R2dbcSession session, EventEnvelope<ShoppingCart.Event> envelope) { ShoppingCart.Event event = envelope.event(); if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; logger.info( "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime); Statement stmt = session .createStatement("INSERT into order (id, time) VALUES ($1, $2)") .bind(0, checkedOut.cartId) .bind(1, checkedOut.eventTime); return session.updateOne(stmt).thenApply(rowsUpdated -> Done.getInstance()); } else { logger.debug("Shopping cart {} changed by {}", event.getCartId(), event); return CompletableFuture.completedFuture(Done.getInstance()); } } }
Such simple handlers can also be defined as plain functions via the helper R2dbcHandler.apply
R2dbcHandler.fromFunction
factory method.
Grouped handler
When using R2dbcProjection.groupedWithin
the handler is processing a Seq
List
of envelopes.
- Scala
-
source
import org.apache.pekko import pekko.projection.r2dbc.scaladsl.R2dbcHandler import pekko.projection.r2dbc.scaladsl.R2dbcSession import pekko.persistence.query.typed.EventEnvelope import scala.collection.immutable class GroupedShoppingCartHandler()(implicit ec: ExecutionContext) extends R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] { private val logger = LoggerFactory.getLogger(getClass) override def process( session: R2dbcSession, envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Future[Done] = { // save all events in DB val stmts = envelopes .map(_.event) .collect { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") session .createStatement("INSERT into order (id, time) VALUES ($1, $2)") .bind(0, cartId) .bind(1, time) } .toVector session.update(stmts).map(_ => Done) } }
- Java
-
source
import org.apache.pekko.projection.r2dbc.javadsl.R2dbcHandler; import org.apache.pekko.projection.r2dbc.javadsl.R2dbcSession; import io.r2dbc.spi.Statement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class GroupedShoppingCartHandler extends R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>> { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<Done> process( R2dbcSession session, List<EventEnvelope<ShoppingCart.Event>> envelopes) { List<Statement> stmts = new ArrayList<>(); for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) { ShoppingCart.Event event = envelope.event(); if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; logger.info( "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime); Statement stmt = session .createStatement("INSERT into order (id, time) VALUES ($1, $2)") .bind(0, checkedOut.cartId) .bind(1, checkedOut.eventTime); stmts.add(stmt); } else { logger.debug("Shopping cart {} changed by {}", event.getCartId(), event); } } return session.update(stmts).thenApply(rowsUpdated -> Done.getInstance()); } }
Stateful handler
The R2dbcHandler
R2dbcHandler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process
.
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance.
Async handler
The Handler
Handler
can be used with R2dbcProjection.atLeastOnceAsync
and R2dbcProjection.groupedWithinAsync
if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.
There are several examples of such Handler
in the documentation for Cassandra Projections. Same type of handlers can be used with R2dbcProjection
instead of CassandraProjection
.
Actor handler
A good alternative for advanced state management is to implement the handler as an actor which is described in Processing with Actor.
Flow handler
A Pekko Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Pekko Streams.
Handler lifecycle
You can override the start
and stop
methods of the R2dbcHandler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Offset types
The supported offset types of the R2dbcProjection
are:
TimestampOffset
TimestampOffset
that is used for SourceProvider for eventsBySlices and SourceProvider for changesBySlices- other
Offset
Offset
types MergeableOffset
MergeableOffset
that is used for messages from KafkaString
Int
Long
- Any other type that has a configured Pekko Serializer is stored with base64 encoding of the serialized bytes.
Publish events for lower latency
To reduce the latency until the Projection finds and process new events you can enable the feature described in eventsBySlices documentation.