DurableStateStore
How to get the DurableStateStore¶
The DurableStateStore
for JDBC plugin is obtained through the DurableStateStoreRegistry
extension.
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(
JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
APIs supported by DurableStateStore¶
The plugin supports the following APIs:
getObject¶
getObject(persistenceId)
returns GetObjectResult(value, revision)
, where value
is an Option
(Optional
in Java) and is set to the value of the object if it exists with the passed in persistenceId
. Otherwise value
is empty.
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.state.scaladsl.GetObjectResult
val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
val futureResult: Future[GetObjectResult[String]] = store.getObject("InvalidPersistenceId")
futureResult.futureValue.value shouldBe None
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.state.javadsl.GetObjectResult;
@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(
JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
CompletionStage<GetObjectResult<String>> futureResult = store.getObject("InvalidPersistenceId");
try {
GetObjectResult<String> result = futureResult.toCompletableFuture().get();
assert !result.value().isPresent();
} catch (Exception e) {
// handle exceptions
}
upsertObject¶
upsertObject(persistenceId, revision, value, tag)
inserts the record if the persistenceId
does not exist in the database. Or else it updates the record with the latest revision passed as revision
. The update succeeds only if the incoming revision
is 1 more than the already existing one. This snippet is an example of a sequnece of upsertObject
and getObject
.
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.state.scaladsl.GetObjectResult
val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
val v: Future[GetObjectResult[String]] =
for {
n <- store.upsertObject("p234", 1, "a valid string", "t123")
_ = n shouldBe pekko.Done
g <- store.getObject("p234")
_ = g.value shouldBe Some("a valid string")
u <- store.upsertObject("p234", 2, "updated valid string", "t123")
_ = u shouldBe pekko.Done
h <- store.getObject("p234")
} yield h
v.futureValue.value shouldBe Some("updated valid string")
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.state.javadsl.GetObjectResult;
@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(
JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
CompletionStage<GetObjectResult<String>> r =
store
.upsertObject("p234", 1, "a valid string", "t123")
.thenCompose(d -> store.getObject("p234"))
.thenCompose(o -> store.upsertObject("p234", 2, "updated valid string", "t123"))
.thenCompose(d -> store.getObject("p234"));
try {
assert r.toCompletableFuture().get().value().get().equals("updated valid string");
} catch (Exception e) {
// handle exceptions
}
deleteObject¶
deleteObject(persistenceId)
deletes the record with the input persistenceId
.
sourceimport org.apache.pekko
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
store.deleteObject("p123").futureValue shouldBe Done
store.getObject("p123").futureValue.value shouldBe None
sourceimport org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(
JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
CompletionStage<Done> futureResult = store.deleteObject("p123");
try {
assert futureResult.toCompletableFuture().get().equals(Done.getInstance());
} catch (Exception e) {
// handle exceptions
}
currentChanges¶
currentChanges(tag, offset)
gets a source of the most recent changes made to objects with the given tag
since the passed in offset
. This api returns changes that occurred up to when the Source
returned by this call is materialized.
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.query.{ DurableStateChange, NoOffset }
val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
val willCompleteTheStream: Source[DurableStateChange[String], NotUsed] =
store.currentChanges("tag-1", NoOffset)
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.NoOffset;
@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(
JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
Source<DurableStateChange<String>, NotUsed> willCompleteTheStream =
store.currentChanges("tag-1", NoOffset.getInstance());
changes¶
changes(tag, offset)
gets a source of the most recent changes made to objects with the given tag
since the passed in offset
. The returned source will never terminate, it effectively watches for changes to the objects and emits changes as they happen.
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import pekko.persistence.query.{ DurableStateChange, NoOffset }
val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
val willNotCompleteTheStream: Source[DurableStateChange[String], NotUsed] =
store.changes("tag-1", NoOffset)
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.persistence.state.DurableStateStoreRegistry;
import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import org.apache.pekko.persistence.query.DurableStateChange;
import org.apache.pekko.persistence.query.NoOffset;
@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(
JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
Source<DurableStateChange<String>, NotUsed> willNotCompleteTheStream =
store.changes("tag-1", NoOffset.getInstance());