Changes from Durable State

A typical source for Projections is the change stored with DurableStateBehaviorDurableStateBehavior in Apache Pekko Persistence. Durable state changes can be tagged and then consumed with the changes query.

Apache Pekko Projections has integration with changes, which is described here.

Dependencies

To use the Durable State module of Apache Pekko Projections, add the following dependency in your project:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-durable-state" % "1.0.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-durable-state_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-durable-state_${versions.ScalaBinary}:1.0.0"
}

Apache Pekko Projections requires Pekko 1.0.2 or later, see Pekko version.

Project Info: Apache Pekko Projections Durable State
Artifact
org.apache.pekko
pekko-projection-durable-state
1.0.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions3.3.3, 2.13.13, 2.12.19
JPMS module namepekko.projection.durable-state
License
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/apache/pekko-projection

Transitive dependencies

The table below shows the pekko-projection-durable-state direct dependencies.The second tab shows all libraries it depends on transitively.

SourceProvider for changesByTag

A SourceProviderSourceProvider defines the source of the envelopes that the Projection will process. A SourceProvider for the changes query can be defined with the DurableStateStoreProviderDurableStateStoreProvider like this:

Scala
sourceimport org.apache.pekko
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
import pekko.projection.state.scaladsl.DurableStateSourceProvider
import pekko.projection.scaladsl.SourceProvider

val sourceProvider: SourceProvider[Offset, DurableStateChange[AccountEntity.Account]] =
  DurableStateSourceProvider
    .changesByTag[AccountEntity.Account](system, JdbcDurableStateStore.Identifier, "bank-accounts-1")
Java
sourceimport org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.state.javadsl.DurableStateSourceProvider;
import org.apache.pekko.projection.javadsl.SourceProvider;

SourceProvider<Offset, DurableStateChange<AccountEntity.Account>> sourceProvider =
    DurableStateSourceProvider.changesByTag(
        system, JdbcDurableStateStore.Identifier(), "bank-accounts-1");

This example is using the DurableStateStore JDBC plugin for Apache Pekko Persistence. You will use the same plugin that you configured for the write side. The one that is used by the DurableStateBehavior.

This source is consuming all the changes from the Account DurableStateBehavior that are tagged with "bank-accounts-1". In a production application, you would need to start as many instances as the number of different tags you used. That way you consume the changes from all entities.

The DurableStateChange[AccountEntity.Account]DurableStateChange<AccountEntity.Account> is what the Projection handler will process. It contains the State and additional meta data, such as the offset that will be stored by the Projection. See DurableStateChangeDurableStateChange for full details of what it contains.

SourceProvider for changesBySlices

A SourceProviderSourceProvider defines the source of the envelopes that the Projection will process. A SourceProvider for the changesBySlices query can be defined with the DurableStateStoreProviderDurableStateStoreProvider like this:

Scala
sourceimport org.apache.pekko
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
import pekko.projection.state.scaladsl.DurableStateSourceProvider
import pekko.projection.scaladsl.SourceProvider

// Slit the slices into 4 ranges
val numberOfSliceRanges: Int = 4
val sliceRanges =
  DurableStateSourceProvider.sliceRanges(system, R2dbcDurableStateStore.Identifier, numberOfSliceRanges)

// Example of using the first slice range
val minSlice: Int = sliceRanges.head.min
val maxSlice: Int = sliceRanges.head.max
val entityType: String = "Account"

val sourceProvider: SourceProvider[Offset, DurableStateChange[AccountEntity.Account]] =
  DurableStateSourceProvider
    .changesBySlices[AccountEntity.Account](
      system,
      R2dbcDurableStateStore.Identifier,
      entityType,
      minSlice,
      maxSlice)
Java
sourceimport org.apache.pekko.japi.Pair;
import org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.state.javadsl.DurableStateSourceProvider;

// Slit the slices into 4 ranges
int numberOfSliceRanges = 4;
List<Pair<Integer, Integer>> sliceRanges =
    EventSourcedProvider.sliceRanges(
        system, R2dbcDurableStateStore.Identifier(), numberOfSliceRanges);

// Example of using the first slice range
int minSlice = sliceRanges.get(0).first();
int maxSlice = sliceRanges.get(0).second();
String entityType = "MyEntity";

SourceProvider<Offset, DurableStateChange<AccountEntity.Account>> sourceProvider =
    DurableStateSourceProvider.changesBySlices(
        system, R2dbcDurableStateStore.Identifier(), entityType, minSlice, maxSlice);

This example is using the R2DBC plugin for Apache Pekko Persistence. You will use the same plugin that you configured for the write side. The one that is used by the DurableStateBehavior.

This source is consuming all the changes from the Account DurableStateBehavior for the given slice range. In a production application, you would need to start as many instances as the number of slice ranges. That way you consume the changes from all entities.

The DurableStateChange[AccountEntity.Account]DurableStateChange<AccountEntity.Account> is what the Projection handler will process. It contains the State and additional meta data, such as the offset that will be stored by the Projection. See DurableStateChangeDurableStateChange for full details of what it contains.