DurableStateStore
How to get the DurableStateStore
The DurableStateStore for JDBC plugin is obtained through the DurableStateStoreRegistry extension.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) - Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
APIs supported by DurableStateStore
The plugin supports the following APIs:
getObject
getObject(persistenceId) returns GetObjectResult(value, revision), where value is an Option (Optional in Java) and is set to the value of the object if it exists with the passed in persistenceId. Otherwise value is empty.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val futureResult: Future[GetObjectResult[String]] = store.getObject("InvalidPersistenceId") futureResult.futureValue.value shouldBe None - Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.state.javadsl.GetObjectResult; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<GetObjectResult<String>> futureResult = store.getObject("InvalidPersistenceId"); try { GetObjectResult<String> result = futureResult.toCompletableFuture().get(); assert !result.value().isPresent(); } catch (Exception e) { // handle exceptions }
upsertObject
upsertObject(persistenceId, revision, value, tag) inserts the record if the persistenceId does not exist in the database. Or else it updates the record with the latest revision passed as revision. The update succeeds only if the incoming revision is 1 more than the already existing one. This snippet is an example of a sequnece of upsertObject and getObject.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val v: Future[GetObjectResult[String]] = for { n <- store.upsertObject("p234", 1, "a valid string", "t123") _ = n shouldBe pekko.Done g <- store.getObject("p234") _ = g.value shouldBe Some("a valid string") u <- store.upsertObject("p234", 2, "updated valid string", "t123") _ = u shouldBe pekko.Done h <- store.getObject("p234") } yield h v.futureValue.value shouldBe Some("updated valid string") - Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.state.javadsl.GetObjectResult; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<GetObjectResult<String>> r = store .upsertObject("p234", 1, "a valid string", "t123") .thenCompose(d -> store.getObject("p234")) .thenCompose(o -> store.upsertObject("p234", 2, "updated valid string", "t123")) .thenCompose(d -> store.getObject("p234")); try { assert r.toCompletableFuture().get().value().get().equals("updated valid string"); } catch (Exception e) { // handle exceptions }
deleteObject
deleteObject(persistenceId) deletes the record with the input persistenceId.
- Scala
-
source
import org.apache.pekko import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) store.deleteObject("p123").futureValue shouldBe Done store.getObject("p123").futureValue.value shouldBe None - Java
-
source
import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<Done> futureResult = store.deleteObject("p123"); try { assert futureResult.toCompletableFuture().get().equals(Done.getInstance()); } catch (Exception e) { // handle exceptions }
currentChanges
currentChanges(tag, offset) gets a source of the most recent changes made to objects with the given tag since the passed in offset. This api returns changes that occurred up to when the Source returned by this call is materialized.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.currentChanges("tag-1", NoOffset) - Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.query.DurableStateChange; import org.apache.pekko.persistence.query.NoOffset; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); Source<DurableStateChange<String>, NotUsed> willCompleteTheStream = store.currentChanges("tag-1", NoOffset.getInstance());
changes
changes(tag, offset) gets a source of the most recent changes made to objects with the given tag since the passed in offset. The returned source will never terminate, it effectively watches for changes to the objects and emits changes as they happen.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.Source import pekko.persistence.state.DurableStateStoreRegistry import pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import pekko.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willNotCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.changes("tag-1", NoOffset) - Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.persistence.state.DurableStateStoreRegistry; import org.apache.pekko.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import org.apache.pekko.persistence.query.DurableStateChange; import org.apache.pekko.persistence.query.NoOffset; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); Source<DurableStateChange<String>, NotUsed> willNotCompleteTheStream = store.changes("tag-1", NoOffset.getInstance());