Persistence Query
Dependency
To use Apache Persistence Query, you must add the following dependency in your project:
- sbt
val PekkoVersion = "1.1.2" libraryDependencies += "org.apache.pekko" %% "pekko-persistence-query" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-query_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2") implementation "org.apache.pekko:pekko-persistence-query_${versions.ScalaBinary}" }
This will also add dependency on the Pekko Persistence module.
Introduction
Pekko persistence query provides a query interface to Durable State Behaviors. These queries are based on asynchronous streams. These streams are similar to the ones offered in the Event Sourcing based implementation. Various state store plugins can implement these interfaces to expose their query capabilities.
One of the rationales behind having a separate query module for Pekko Persistence is for implementing the so-called query side or read side in the popular CQRS architecture pattern - in which the writing side of the application implemented using Pekko persistence, is completely separated from the query side.
Using query with Pekko Projections
Pekko Persistence and Pekko Projections together can be used to develop a CQRS application. In the application the durable state is stored in a database and fetched as an asynchronous stream to the user. Currently queries on durable state, provided by the DurableStateStoreQuery
interface, is used to implement tag based searches in Pekko Projections.
At present the query is based on tags. So if you have not tagged your objects, this query cannot be used.
The example below shows how to get the DurableStateStoreQuery
from the DurableStateStoreRegistry
extension.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.query.scaladsl.DurableStateStoreQuery import pekko.persistence.query.DurableStateChange import pekko.persistence.query.UpdatedDurableState val durableStateStoreQuery = DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId) val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset) source.map { case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value) case _: DeletedDurableState[_] => None }
- Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.query.javadsl.DurableStateStoreQuery; import org.apache.pekko.persistence.query.DurableStateChange; import org.apache.pekko.persistence.query.UpdatedDurableState; DurableStateStoreQuery<Record> durableStateStoreQuery = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor(DurableStateStoreQuery.class, pluginId); Source<DurableStateChange<Record>, NotUsed> source = durableStateStoreQuery.changes("tag", offset); source.map( chg -> { if (chg instanceof UpdatedDurableState) { UpdatedDurableState<Record> upd = (UpdatedDurableState<Record>) chg; return upd.value(); } else { throw new IllegalArgumentException("Unexpected DurableStateChange " + chg.getClass()); } });
The DurableStateChange
DurableStateChange
elements can be UpdatedDurableState
or DeletedDurableState
.