Query Plugin
Event sourced queries
R2dbcReadJournal
R2dbcReadJournal
implements the following Persistence Queries:
eventsByPersistenceId
,currentEventsByPersistenceId
eventsBySlices
,currentEventsBySlices
currentPersistenceIds
Accessing the R2dbcReadJournal
:
- Java
-
source
import org.apache.pekko.persistence.query.PersistenceQuery; import org.apache.pekko.persistence.r2dbc.query.javadsl.R2dbcReadJournal; R2dbcReadJournal eventQueries = PersistenceQuery.get(system) .getReadJournalFor(R2dbcReadJournal.class, R2dbcReadJournal.Identifier());
- Scala
-
source
import org.apache.pekko import pekko.persistence.query.PersistenceQuery import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal val eventQueries = PersistenceQuery(system) .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
eventsByPersistenceId
The eventsByPersistenceId
and currentEventsByPersistenceId
queries are useful for retrieving events for a single entity with a given persistence id.
Example of currentEventsByPersistenceId
:
- Java
-
source
PersistenceId persistenceId = PersistenceId.of("MyEntity", "id1"); eventQueries .currentEventsByPersistenceId(persistenceId.id(), 1, 101) .map(envelope -> "event with seqNr " + envelope.sequenceNr() + ": " + envelope.event()) .runWith(Sink.foreach(System.out::println), system);
- Scala
-
source
val persistenceId = PersistenceId("MyEntity", "id1") eventQueries .currentEventsByPersistenceId(persistenceId.id, 1, 101) .map(envelope => s"event with seqNr ${envelope.sequenceNr}: ${envelope.event}") .runWith(Sink.foreach(println))
eventsBySlices
The eventsBySlices
and currentEventsBySlices
queries are useful for retrieving all events for a given entity type. eventsBySlices
should be used via Pekko Projection.
This has historically been done with eventsByTag
but the R2DBC plugin is instead providing eventsBySlices
as an improved solution.
The usage of eventsByTag
for Projections has the drawback that the number of tags must be decided up-front and can’t easily be changed afterwards. Starting with too many tags means much overhead since many projection instances would be running on each node in a small Pekko Cluster. Each projection instance polling the database periodically. Starting with too few tags means that it can’t be scaled later to more Pekko nodes.
With eventsBySlices
more Projection instances can be added when needed and still reuse the offsets for the previous slice distributions.
A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all persistence ids over the slices. The eventsBySlices
query is for a range of the slices. For example if using 1024 slices and running 4 Projection instances the slice ranges would be 0-255, 256-511, 512-767, 768-1023. Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, …, 768-895, 896-1023.
Example of currentEventsBySlices
:
- Java
-
source
import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.persistence.query.typed.EventEnvelope; // Split the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; Source<EventEnvelope<MyEvent>, NotUsed> source = eventQueries.currentEventsBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance()); source .map( envelope -> "event from persistenceId " + envelope.persistenceId() + " with seqNr " + envelope.sequenceNr() + ": " + envelope.event()) .runWith(Sink.foreach(System.out::println), system);
- Scala
-
source
import org.apache.pekko.persistence.query.typed.EventEnvelope // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "MyEntity" eventQueries .currentEventsBySlices[MyEvent](entityType, minSlice, maxSlice, NoOffset.getInstance) .map(envelope => s"event from persistenceId ${envelope.persistenceId} with " + s"seqNr ${envelope.sequenceNr}: ${envelope.event}") .runWith(Sink.foreach(println))
eventsBySlices
should be used via R2dbcProjection, which will automatically handle the following difficulties. When using R2dbcProjection
the events will be delivered in sequence number order without duplicates.
The consumer can keep track of its current position in the event stream by storing the offset
and restart the query from a given offset
after a crash/restart.
The offset is a TimestampOffset
and it is based on the database transaction_timestamp()
when the event was stored. transaction_timestamp()
is the time when the transaction started, not when it was committed. This means that a “later” event may be visible first and when retrieving events after the previously seen timestamp we may miss some events and emit event with a later sequence number for a persistence id without emitting all preceding events. In distributed SQL databases there can also be clock skews for the database timestamps. For that reason eventsBySlices
will perform additional backtracking queries to catch missed events. Events from backtracking will typically be duplicates of previously emitted events. It’s the responsibility of the consumer to filter duplicates and make sure that events are processed in exact sequence number order for each persistence id. Such deduplication is provided by the R2DBC Projection.
Events emitted by the backtracking don’t contain the event payload (EventBySliceEnvelope.event
is None) and the consumer can load the full EventBySliceEnvelope
with R2dbcReadJournal.loadEnvelope.
The events will be emitted in the timestamp order with the caveat of duplicate events as described above. Events with the same timestamp are ordered by sequence number.
currentEventsBySlices
doesn’t perform these backtracking queries and will not emit duplicates and the event payload is always full loaded.
Publish events for lower latency of eventsBySlices
The eventsBySlices
query polls the database periodically to find new events. By default, this interval is a few seconds, see pekko.persistence.r2dbc.query.refresh-interval
in the Configuration. This interval can be reduced for lower latency, with the drawback of querying the database more frequently.
If you need latency below a few 100 milliseconds you can enable a feature that will publish the events within the Pekko Cluster instead of reducing refresh-interval
. Running eventsBySlices
will subscribe to the events and emit them directly without waiting for next query poll. The tradeoff is that more CPU and network resources are used. The events must still be retrieved from the database, but at a lower polling frequency, because delivery of published messages are not guaranteed.
Enable publishing of events with configuration:
pekko.persistence.r2dbc.journal.publish-events = on
Durable state queries
R2dbcDurableStateStore
R2dbcDurableStateStore
implements the following Persistence Queries:
getObject
changesBySlices
,currentChangesBySlices
currentPersistenceIds
Accessing the R2dbcDurableStateStore
:
- Java
-
source
import org.apache.pekko.persistence.r2dbc.state.javadsl.R2dbcDurableStateStore; import org.apache.pekko.persistence.state.DurableStateStoreRegistry; R2dbcDurableStateStore<MyState> stateQueries = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( R2dbcDurableStateStore.class, R2dbcDurableStateStore.Identifier());
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore val stateQueries = DurableStateStoreRegistry(system) .durableStateStoreFor[R2dbcDurableStateStore[MyState]](R2dbcDurableStateStore.Identifier)
changesBySlices
The changesBySlices
and currentChangesBySlices
queries are useful for retrieving updates of the latest durable state for a given entity type.
Example of currentChangesBySlices
:
- Java
-
source
import org.apache.pekko.persistence.query.DurableStateChange; import org.apache.pekko.persistence.query.UpdatedDurableState; // Split the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = stateQueries.sliceRanges(numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; Source<DurableStateChange<MyState>, NotUsed> source = stateQueries.currentChangesBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance()); source .collectType(UpdatedDurableState.class) .map( change -> "state change from persistenceId " + change.persistenceId() + " with revision " + change.revision() + ": " + change.value()) .runWith(Sink.foreach(System.out::println), system);
- Scala
-
source
import org.apache.pekko.persistence.query.UpdatedDurableState // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = stateQueries.sliceRanges(numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "MyEntity" stateQueries .currentChangesBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance) .collect { case change: UpdatedDurableState[MyState] => change } .map(change => s"state change from persistenceId ${change.persistenceId} with " + s"revision ${change.revision}: ${change.value}") .runWith(Sink.foreach(println))
The emitted DurableStateChange
can be a UpdatedDurableState
or DeletedDurableState
, but DeletedDurableState
is not implemented yet.
It will emit an UpdatedDurableState
when the durable state is updated. When the state is updated again another UpdatedDurableState
is emitted. It will always emit an UpdatedDurableState
for the latest revision of the state, but there is no guarantee that all intermediate changes are emitted if the state is updated several times. Note that UpdatedDurableState
contains the full current state, and it is not a delta from previous revision of state.
changesBySlices
should be used via R2dbcProjection, which will automatically handle the similar difficulties with duplicates as described for eventsBySlices. When using R2dbcProjection
the changes will be delivered in revision number order without duplicates.
Configuration
Query configuration is under pekko.persistence.r2dbc.query
. Here’s the default configuration values for the query plugin:
sourcepekko.persistence.r2dbc {
query {
class = "org.apache.pekko.persistence.r2dbc.query.R2dbcReadJournalProvider"
# When live queries return no results or <= 10% of buffer-size, the next query
# to db will be delayed for this duration.
# When the number of rows from previous query is >= 90% of buffer-size, the next
# query will be emitted immediately.
# Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
# for half of this duration.
refresh-interval = 3s
# Live queries read events up to this duration from the current database time.
behind-current-time = 100 millis
backtracking {
enabled = on
# Backtracking queries will look back for this amount of time. It should
# not be larger than the pekko.projection.r2dbc.offset-store.time-window.
window = 2 minutes
# Backtracking queries read events up to this duration from the current database time.
behind-current-time = 10 seconds
}
# In-memory buffer holding events when reading from database.
buffer-size = 1000
persistence-ids {
buffer-size = 1000
}
# When journal publish-events is enabled a best effort deduplication can be enabled by setting
# this property to the size of the deduplication buffer in the `eventsBySlices` query.
# It keeps track of this number of entries and 5000 is recommended capacity. The drawback
# of enabling this is that when the sequence numbers received via publish-events are out of sync
# after some error scenarios it will take longer to receive those events, since it will rely on
# the backtracking queries.
deduplicate-capacity = 0
}
}
The query plugin shares the connection pool as the rest of the plugin, see Connection configuration.