DurableStateStore
How to get the DurableStateStore
The DurableStateStore
for JDBC plugin is obtained through the DurableStateStoreRegistry
extension.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
- Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
APIs supported by DurableStateStore
The plugin supports the following APIs:
getObject
getObject(persistenceId)
returns GetObjectResult(value, revision)
, where value
is an Option
(Optional
in Java) and is set to the value of the object if it exists with the passed in persistenceId
. Otherwise value
is empty.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val futureResult: Future[GetObjectResult[String]] = store.getObject("InvalidPersistenceId") futureResult.futureValue.value shouldBe None
- Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.state.javadsl.GetObjectResult; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<GetObjectResult<String>> futureResult = store.getObject("InvalidPersistenceId"); try { GetObjectResult<String> result = futureResult.toCompletableFuture().get(); assert !result.value().isPresent(); } catch (Exception e) { // handle exceptions }
upsertObject
upsertObject(persistenceId, revision, value, tag)
inserts the record if the persistenceId
does not exist in the database. Or else it updates the record with the latest revision passed as revision
. The update succeeds only if the incoming revision
is 1 more than the already existing one. This snippet is an example of a sequnece of upsertObject
and getObject
.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val v: Future[GetObjectResult[String]] = for { n <- store.upsertObject("p234", 1, "a valid string", "t123") _ = n shouldBe pekko.Done g <- store.getObject("p234") _ = g.value shouldBe Some("a valid string") u <- store.upsertObject("p234", 2, "updated valid string", "t123") _ = u shouldBe pekko.Done h <- store.getObject("p234") } yield h v.futureValue.value shouldBe Some("updated valid string")
- Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.state.javadsl.GetObjectResult; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<GetObjectResult<String>> r = store .upsertObject("p234", 1, "a valid string", "t123") .thenCompose(d -> store.getObject("p234")) .thenCompose(o -> store.upsertObject("p234", 2, "updated valid string", "t123")) .thenCompose(d -> store.getObject("p234")); try { assert r.toCompletableFuture().get().value().get().equals("updated valid string"); } catch (Exception e) { // handle exceptions }
deleteObject
deleteObject(persistenceId)
deletes the record with the input persistenceId
.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) store.deleteObject("p123").futureValue shouldBe Done store.getObject("p123").futureValue.value shouldBe None
- Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<Done> futureResult = store.deleteObject("p123"); try { assert futureResult.toCompletableFuture().get().equals(Done.getInstance()); } catch (Exception e) { // handle exceptions }
currentChanges
currentChanges(tag, offset)
gets a source of the most recent changes made to objects with the given tag
since the passed in offset
. This api returns changes that occurred up to when the Source
returned by this call is materialized.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.currentChanges("tag-1", NoOffset)
- Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.query.DurableStateChange; import org.apache.pekko.persistence.query.NoOffset; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); Source<DurableStateChange<String>, NotUsed> willCompleteTheStream = store.currentChanges("tag-1", NoOffset.getInstance());
changes
changes(tag, offset)
gets a source of the most recent changes made to objects with the given tag
since the passed in offset
. The returned source will never terminate, it effectively watches for changes to the objects and emits changes as they happen.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willNotCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.changes("tag-1", NoOffset)
- Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.query.DurableStateChange; import org.apache.pekko.persistence.query.NoOffset; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); Source<DurableStateChange<String>, NotUsed> willNotCompleteTheStream = store.changes("tag-1", NoOffset.getInstance());