DurableStateStore

How to get the DurableStateStore

The DurableStateStore for JDBC plugin is obtained through the DurableStateStoreRegistry extension.

Scala
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
val store = DurableStateStoreRegistry
  .get(system)
  .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
Java
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(
            JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());

APIs supported by DurableStateStore

The plugin supports the following APIs:

getObject

getObject(persistenceId) returns GetObjectResult(value, revision), where value is an Option (Optional in Java) and is set to the value of the object if it exists with the passed in persistenceId. Otherwise value is empty.

Scala
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.state.scaladsl.GetObjectResult

val store = DurableStateStoreRegistry
  .get(system)
  .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

val futureResult: Future[GetObjectResult[String]] = store.getObject("InvalidPersistenceId")
futureResult.futureValue.value shouldBe None
Java
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.state.javadsl.GetObjectResult;

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(
            JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());

CompletionStage<GetObjectResult<String>> futureResult = store.getObject("InvalidPersistenceId");
try {
  GetObjectResult<String> result = futureResult.toCompletableFuture().get();
  assert !result.value().isPresent();
} catch (Exception e) {
  // handle exceptions
}

upsertObject

upsertObject(persistenceId, revision, value, tag) inserts the record if the persistenceId does not exist in the database. Or else it updates the record with the latest revision passed as revision. The update succeeds only if the incoming revision is 1 more than the already existing one. This snippet is an example of a sequnece of upsertObject and getObject.

Scala
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.state.scaladsl.GetObjectResult

val store = DurableStateStoreRegistry
  .get(system)
  .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

val v: Future[GetObjectResult[String]] =
  for {
    n <- store.upsertObject("p234", 1, "a valid string", "t123")
    _ = n shouldBe pekko.Done
    g <- store.getObject("p234")
    _ = g.value shouldBe Some("a valid string")
    u <- store.upsertObject("p234", 2, "updated valid string", "t123")
    _ = u shouldBe pekko.Done
    h <- store.getObject("p234")
  } yield h

v.futureValue.value shouldBe Some("updated valid string")
Java
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.state.javadsl.GetObjectResult;

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(
            JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());

CompletionStage<GetObjectResult<String>> r =
    store
        .upsertObject("p234", 1, "a valid string", "t123")
        .thenCompose(d -> store.getObject("p234"))
        .thenCompose(o -> store.upsertObject("p234", 2, "updated valid string", "t123"))
        .thenCompose(d -> store.getObject("p234"));

try {
  assert r.toCompletableFuture().get().value().get().equals("updated valid string");
} catch (Exception e) {
  // handle exceptions
}

deleteObject

deleteObject(persistenceId) deletes the record with the input persistenceId.

Scala
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore

val store = DurableStateStoreRegistry
  .get(system)
  .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

store.deleteObject("p123").futureValue shouldBe Done
store.getObject("p123").futureValue.value shouldBe None
Java
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(
            JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());

CompletionStage<Done> futureResult = store.deleteObject("p123");
try {
  assert futureResult.toCompletableFuture().get().equals(Done.getInstance());
} catch (Exception e) {
  // handle exceptions
}

currentChanges

currentChanges(tag, offset) gets a source of the most recent changes made to objects with the given tag since the passed in offset. This api returns changes that occurred up to when the Source returned by this call is materialized.

Scala
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.query.{ DurableStateChange, NoOffset }

val store = DurableStateStoreRegistry
  .get(system)
  .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

val willCompleteTheStream: Source[DurableStateChange[String], NotUsed] =
  store.currentChanges("tag-1", NoOffset)
Java
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.NoOffset;

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(
            JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());

Source<DurableStateChange<String>, NotUsed> willCompleteTheStream =
    store.currentChanges("tag-1", NoOffset.getInstance());

changes

changes(tag, offset) gets a source of the most recent changes made to objects with the given tag since the passed in offset. The returned source will never terminate, it effectively watches for changes to the objects and emits changes as they happen.

Scala
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.query.{ DurableStateChange, NoOffset }

val store = DurableStateStoreRegistry
  .get(system)
  .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

val willNotCompleteTheStream: Source[DurableStateChange[String], NotUsed] =
  store.changes("tag-1", NoOffset)
Java
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.NoOffset;

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(
            JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());

Source<DurableStateChange<String>, NotUsed> willNotCompleteTheStream =
    store.changes("tag-1", NoOffset.getInstance());