Query Plugin

Event sourced queries

R2dbcReadJournalR2dbcReadJournal implements the following Persistence Queries:

  • eventsByPersistenceId, currentEventsByPersistenceId
  • eventsBySlices, currentEventsBySlices
  • currentPersistenceIds

Accessing the R2dbcReadJournal:

Java
sourceimport org.apache.pekko.persistence.query.PersistenceQuery;
import org.apache.pekko.persistence.r2dbc.query.javadsl.R2dbcReadJournal;

R2dbcReadJournal eventQueries =
    PersistenceQuery.get(system)
        .getReadJournalFor(R2dbcReadJournal.class, R2dbcReadJournal.Identifier());
Scala
sourceimport org.apache.pekko
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal

val eventQueries = PersistenceQuery(system)
  .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)

eventsByPersistenceId

The eventsByPersistenceId and currentEventsByPersistenceId queries are useful for retrieving events for a single entity with a given persistence id.

Example of currentEventsByPersistenceId:

Java
sourcePersistenceId persistenceId = PersistenceId.of("MyEntity", "id1");
eventQueries
    .currentEventsByPersistenceId(persistenceId.id(), 1, 101)
    .map(envelope -> "event with seqNr " + envelope.sequenceNr() + ": " + envelope.event())
    .runWith(Sink.foreach(System.out::println), system);
Scala
sourceval persistenceId = PersistenceId("MyEntity", "id1")
eventQueries
  .currentEventsByPersistenceId(persistenceId.id, 1, 101)
  .map(envelope => s"event with seqNr ${envelope.sequenceNr}: ${envelope.event}")
  .runWith(Sink.foreach(println))

eventsBySlices

The eventsBySlices and currentEventsBySlices queries are useful for retrieving all events for a given entity type. eventsBySlices should be used via Pekko Projection.

Note

This has historically been done with eventsByTag but the R2DBC plugin is instead providing eventsBySlices as an improved solution.

The usage of eventsByTag for Projections has the drawback that the number of tags must be decided up-front and can’t easily be changed afterwards. Starting with too many tags means much overhead since many projection instances would be running on each node in a small Pekko Cluster. Each projection instance polling the database periodically. Starting with too few tags means that it can’t be scaled later to more Pekko nodes.

With eventsBySlices more Projection instances can be added when needed and still reuse the offsets for the previous slice distributions.

A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all persistence ids over the slices. The eventsBySlices query is for a range of the slices. For example if using 1024 slices and running 4 Projection instances the slice ranges would be 0-255, 256-511, 512-767, 768-1023. Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, …, 768-895, 896-1023.

Example of currentEventsBySlices:

Java
sourceimport org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.persistence.query.typed.EventEnvelope;

// Split the slices into 4 ranges
int numberOfSliceRanges = 4;
List<Pair<Integer, Integer>> sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges);

// Example of using the first slice range
int minSlice = sliceRanges.get(0).first();
int maxSlice = sliceRanges.get(0).second();
String entityType = "MyEntity";
Source<EventEnvelope<MyEvent>, NotUsed> source =
    eventQueries.currentEventsBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance());
source
    .map(
        envelope ->
            "event from persistenceId "
                + envelope.persistenceId()
                + " with seqNr "
                + envelope.sequenceNr()
                + ": "
                + envelope.event())
    .runWith(Sink.foreach(System.out::println), system);
Scala
sourceimport org.apache.pekko.persistence.query.typed.EventEnvelope

// Slit the slices into 4 ranges
val numberOfSliceRanges: Int = 4
val sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges)

// Example of using the first slice range
val minSlice: Int = sliceRanges.head.min
val maxSlice: Int = sliceRanges.head.max
val entityType: String = "MyEntity"
eventQueries
  .currentEventsBySlices[MyEvent](entityType, minSlice, maxSlice, NoOffset.getInstance)
  .map(envelope =>
    s"event from persistenceId ${envelope.persistenceId} with " +
    s"seqNr ${envelope.sequenceNr}: ${envelope.event}")
  .runWith(Sink.foreach(println))

eventsBySlices should be used via R2dbcProjection, which will automatically handle the following difficulties. When using R2dbcProjection the events will be delivered in sequence number order without duplicates.

The consumer can keep track of its current position in the event stream by storing the offset and restart the query from a given offset after a crash/restart.

The offset is a TimestampOffset and it is based on the database transaction_timestamp() when the event was stored. transaction_timestamp() is the time when the transaction started, not when it was committed. This means that a “later” event may be visible first and when retrieving events after the previously seen timestamp we may miss some events and emit event with a later sequence number for a persistence id without emitting all preceding events. In distributed SQL databases there can also be clock skews for the database timestamps. For that reason eventsBySlices will perform additional backtracking queries to catch missed events. Events from backtracking will typically be duplicates of previously emitted events. It’s the responsibility of the consumer to filter duplicates and make sure that events are processed in exact sequence number order for each persistence id. Such deduplication is provided by the R2DBC Projection.

Events emitted by the backtracking don’t contain the event payload (EventBySliceEnvelope.event is None) and the consumer can load the full EventBySliceEnvelope with R2dbcReadJournal.loadEnvelope.

The events will be emitted in the timestamp order with the caveat of duplicate events as described above. Events with the same timestamp are ordered by sequence number.

currentEventsBySlices doesn’t perform these backtracking queries and will not emit duplicates and the event payload is always full loaded.

Publish events for lower latency of eventsBySlices

The eventsBySlices query polls the database periodically to find new events. By default, this interval is a few seconds, see pekko.persistence.r2dbc.query.refresh-interval in the Configuration. This interval can be reduced for lower latency, with the drawback of querying the database more frequently.

If you need latency below a few 100 milliseconds you can enable a feature that will publish the events within the Pekko Cluster instead of reducing refresh-interval. Running eventsBySlices will subscribe to the events and emit them directly without waiting for next query poll. The tradeoff is that more CPU and network resources are used. The events must still be retrieved from the database, but at a lower polling frequency, because delivery of published messages are not guaranteed.

Enable publishing of events with configuration:

pekko.persistence.r2dbc.journal.publish-events = on

Durable state queries

R2dbcDurableStateStoreR2dbcDurableStateStore implements the following Persistence Queries:

  • getObject
  • changesBySlices, currentChangesBySlices
  • currentPersistenceIds

Accessing the R2dbcDurableStateStore:

Java
sourceimport org.apache.pekko.persistence.r2dbc.state.javadsl.R2dbcDurableStateStore;
import org.apache.pekko.persistence.state.DurableStateStoreRegistry;

R2dbcDurableStateStore<MyState> stateQueries =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(
            R2dbcDurableStateStore.class, R2dbcDurableStateStore.Identifier());
Scala
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore

val stateQueries = DurableStateStoreRegistry(system)
  .durableStateStoreFor[R2dbcDurableStateStore[MyState]](R2dbcDurableStateStore.Identifier)

changesBySlices

The changesBySlices and currentChangesBySlices queries are useful for retrieving updates of the latest durable state for a given entity type.

Example of currentChangesBySlices:

Java
sourceimport org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.UpdatedDurableState;

// Split the slices into 4 ranges
int numberOfSliceRanges = 4;
List<Pair<Integer, Integer>> sliceRanges = stateQueries.sliceRanges(numberOfSliceRanges);

// Example of using the first slice range
int minSlice = sliceRanges.get(0).first();
int maxSlice = sliceRanges.get(0).second();
String entityType = "MyEntity";
Source<DurableStateChange<MyState>, NotUsed> source =
    stateQueries.currentChangesBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance());
source
    .collectType(UpdatedDurableState.class)
    .map(
        change ->
            "state change from persistenceId "
                + change.persistenceId()
                + " with revision "
                + change.revision()
                + ": "
                + change.value())
    .runWith(Sink.foreach(System.out::println), system);
Scala
sourceimport org.apache.pekko.persistence.query.UpdatedDurableState

// Slit the slices into 4 ranges
val numberOfSliceRanges: Int = 4
val sliceRanges = stateQueries.sliceRanges(numberOfSliceRanges)

// Example of using the first slice range
val minSlice: Int = sliceRanges.head.min
val maxSlice: Int = sliceRanges.head.max
val entityType: String = "MyEntity"
stateQueries
  .currentChangesBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance)
  .collect { case change: UpdatedDurableState[MyState] => change }
  .map(change =>
    s"state change from persistenceId ${change.persistenceId} with " +
    s"revision ${change.revision}: ${change.value}")
  .runWith(Sink.foreach(println))

The emitted DurableStateChange can be a UpdatedDurableState or DeletedDurableState, but DeletedDurableState is not implemented yet.

It will emit an UpdatedDurableState when the durable state is updated. When the state is updated again another UpdatedDurableState is emitted. It will always emit an UpdatedDurableState for the latest revision of the state, but there is no guarantee that all intermediate changes are emitted if the state is updated several times. Note that UpdatedDurableState contains the full current state, and it is not a delta from previous revision of state.

changesBySlices should be used via R2dbcProjection, which will automatically handle the similar difficulties with duplicates as described for eventsBySlices. When using R2dbcProjection the changes will be delivered in revision number order without duplicates.

Configuration

Query configuration is under pekko.persistence.r2dbc.query. Here’s the default configuration values for the query plugin:

sourcepekko.persistence.r2dbc {
  query {
    class = "org.apache.pekko.persistence.r2dbc.query.R2dbcReadJournalProvider"

    # When live queries return no results or <= 10% of buffer-size, the next query
    # to db will be delayed for this duration.
    # When the number of rows from previous query is >= 90% of buffer-size, the next
    # query will be emitted immediately.
    # Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
    # for half of this duration.
    refresh-interval = 3s

    # Live queries read events up to this duration from the current database time.
    behind-current-time = 100 millis

    backtracking {
      enabled = on
      # Backtracking queries will look back for this amount of time. It should
      # not be larger than the pekko.projection.r2dbc.offset-store.time-window.
      window = 2 minutes
      # Backtracking queries read events up to this duration from the current database time.
      behind-current-time = 10 seconds
    }

    # In-memory buffer holding events when reading from database.
    buffer-size = 1000

    persistence-ids {
      buffer-size = 1000
    }

    # When journal publish-events is enabled a best effort deduplication can be enabled by setting
    # this property to the size of the deduplication buffer in the `eventsBySlices` query.
    # It keeps track of this number of entries and 5000 is recommended capacity. The drawback
    # of enabling this is that when the sequence numbers received via publish-events are out of sync
    # after some error scenarios it will take longer to receive those events, since it will rely on
    # the backtracking queries.
    deduplicate-capacity = 0

  }
}

The query plugin shares the connection pool as the rest of the plugin, see Connection configuration.