Persistence Query

Dependency

To use Apache Persistence Query, you must add the following dependency in your project:

sbt
val PekkoVersion = "1.0.0"
libraryDependencies += "org.apache.pekko" %% "pekko-persistence-query" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.0.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-persistence-query_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.0")

  implementation "org.apache.pekko:pekko-persistence-query_${versions.ScalaBinary}"
}

This will also add dependency on the Pekko Persistence module.

Introduction

Pekko persistence query provides a query interface to Durable State Behaviors. These queries are based on asynchronous streams. These streams are similar to the ones offered in the Event Sourcing based implementation. Various state store plugins can implement these interfaces to expose their query capabilities.

One of the rationales behind having a separate query module for Pekko Persistence is for implementing the so-called query side or read side in the popular CQRS architecture pattern - in which the writing side of the application implemented using Pekko persistence, is completely separated from the query side.

Using query with Pekko Projections

Pekko Persistence and Pekko Projections together can be used to develop a CQRS application. In the application the durable state is stored in a database and fetched as an asynchronous stream to the user. Currently queries on durable state, provided by the DurableStateStoreQuery interface, is used to implement tag based searches in Pekko Projections.

At present the query is based on tags. So if you have not tagged your objects, this query cannot be used.

The example below shows how to get the DurableStateStoreQuery from the DurableStateStoreRegistry extension.

Scala
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.query.scaladsl.DurableStateStoreQuery
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.UpdatedDurableState

val durableStateStoreQuery =
  DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId)
val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset)
source.map {
  case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value)
  case _: DeletedDurableState[_]                                              => None
}
Java
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.query.javadsl.DurableStateStoreQuery;
import org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.UpdatedDurableState;

DurableStateStoreQuery<Record> durableStateStoreQuery =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(DurableStateStoreQuery.class, pluginId);
Source<DurableStateChange<Record>, NotUsed> source =
    durableStateStoreQuery.changes("tag", offset);
source.map(
    chg -> {
      if (chg instanceof UpdatedDurableState) {
        UpdatedDurableState<Record> upd = (UpdatedDurableState<Record>) chg;
        return upd.value();
      } else {
        throw new IllegalArgumentException("Unexpected DurableStateChange " + chg.getClass());
      }
    });

The DurableStateChangeDurableStateChange elements can be UpdatedDurableState or DeletedDurableState.