Durable State
Module info
To use Pekko Persistence, add the module to your project:
- sbt
val PekkoVersion = "1.0.1" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-persistence-typed" % PekkoVersion, "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion % Test )
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-typed_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.1") implementation "org.apache.pekko:pekko-persistence-typed_${versions.ScalaBinary}" testImplementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}" }
You also have to select durable state store plugin, see Persistence Plugins.
Project Info: Pekko Event Sourcing (typed) | |
---|---|
Artifact | org.apache.pekko
pekko-persistence-typed
1.0.1
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.11, 2.12.18, 3.3.1 |
JPMS module name | pekko.persistence.typed |
License | |
Home page | https://pekko.apache.org/ |
API documentation | |
Forums | |
Release notes | Release Notes |
Issues | Github issues |
Sources | https://github.com/apache/incubator-pekko |
Introduction
This model of Pekko Persistence enables a stateful actor / entity to store the full state after processing each command instead of using event sourcing. This reduces the conceptual complexity and can be a handy tool for simple use cases. Very much like a CRUD based operation, the API is conceptually simple - a function from current state and incoming command to the next state which replaces the current state in the database.
(State, Command) => State
The current state is always stored in the database. Since only the latest state is stored, we don’t have access to any of the history of changes, unlike event sourced storage. Pekko Persistence would read that state and store it in memory. After processing of the command is finished, the new state will be stored in the database. The processing of the next command will not start until the state has been successfully stored in the database.
Pekko Persistence also supports Event Sourcing based implementation, where only the events that are persisted by the actor are stored, but not the actual state of the actor. By storing all events, using this model, a stateful actor can be recovered by replaying the stored events to the actor, which allows it to rebuild its state.
Since each entity lives on one node, consistency is guaranteed and reads can be served directly from memory. For details on how this guarantee is ensured, have a look at the Cluster Sharding and DurableStateBehavior section below.
Example and core API
Let’s start with a simple example that models a counter using a Pekko persistent actor. The minimum required for a DurableStateBehavior
DurableStateBehavior
is:
- Scala
-
source
import org.apache.pekko import pekko.persistence.typed.state.scaladsl.DurableStateBehavior import pekko.persistence.typed.PersistenceId object MyPersistentCounter { sealed trait Command[ReplyMessage] extends CborSerializable final case class State(value: Int) extends CborSerializable def counter(persistenceId: PersistenceId): DurableStateBehavior[Command[_], State] = { DurableStateBehavior.apply[Command[_], State]( persistenceId, emptyState = State(0), commandHandler = (state, command) => throw new NotImplementedError("TODO: process the command & return an Effect")) } }
- Java
-
source
public class MyPersistentCounter extends DurableStateBehavior<MyPersistentCounter.Command<?>, MyPersistentCounter.State> { interface Command<ReplyMessage> {} public static class State { private final int value; public State(int value) { this.value = value; } public int get() { return value; } } public static Behavior<Command<?>> create(PersistenceId persistenceId) { return new MyPersistentCounter(persistenceId); } private MyPersistentCounter(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { return new State(0); } @Override public CommandHandler<Command<?>, State> commandHandler() { return (state, command) -> { throw new RuntimeException("TODO: process the command & return an Effect"); }; } }
The first important thing to notice is the Behavior
of a persistent actor is typed to the type of the Command
because this is the type of message a persistent actor should receive. In Pekko, this is now enforced by the type system.
The components that make up a DurableStateBehavior
are:
persistenceId
is the stable unique identifier for the persistent actor.emptyState
defines theState
when the entity is first created e.g. a Counter would start with 0 as state.commandHandler
defines how to handle commands and map to appropriate effects e.g. persisting state and replying to actors.
Next we’ll discuss each of these in detail.
PersistenceId
The PersistenceId
PersistenceId
is the stable unique identifier for the persistent actor in the backend durabe state store.
Cluster Sharding is typically used together with DurableStateBehavior
to ensure that there is only one active entity for each PersistenceId
(entityId
). There are techniques to ensure this uniqueness, an example of which can be found in the Persistence example in the Cluster Sharding documentation. This illustrates how to construct the PersistenceId
from the entityTypeKey
and entityId
provided by the EntityContext
.
The entityId
in Cluster Sharding is the business domain identifier which uniquely identifies the instance of that specific EntityType
. This means that across the cluster we have a unique combination of (EntityType
, EntityId
). Hence the entityId
might not be unique enough to be used as the PersistenceId
by itself. For example two different types of entities may have the same entityId
. To create a unique PersistenceId
the entityId
should be prefixed with a stable name of the entity type, which typically is the same as the EntityTypeKey.name
that is used in Cluster Sharding. There are PersistenceId.apply
PersistenceId.of
factory methods to help with constructing such PersistenceId
from an entityTypeHint
and entityId
.
The default separator when concatenating the entityTypeHint
and entityId
is |
, but a custom separator is supported.
A custom identifier can be created with PersistenceId.ofUniqueId
.
Command handler
The command handler is a function with 2 parameters, the current State
and the incoming Command
.
A command handler returns an Effect
directive that defines what state, if any, to persist. Effects are created using a factory that is returned via the Effect()
method the Effect
factory.
The two most commonly used effects are:
persist
will persist the latest value of the state. No history of state changes will be storednone
no state to be persisted, for example a read-only command
More effects are explained in Effects and Side Effects.
In addition to returning the primary Effect
for the command, DurableStateBehavior
s can also chain side effects that are to be performed after successful persist which is achieved with the thenRun
function e.g. Effect.persist(..).thenRun
Effect().persist(..).thenRun
.
Completing the example
Let’s fill in the details of the example.
Commands:
- Scala
-
source
sealed trait Command[ReplyMessage] extends CborSerializable final case object Increment extends Command[Nothing] final case class IncrementBy(value: Int) extends Command[Nothing] final case class GetValue(replyTo: ActorRef[State]) extends Command[State] final case object Delete extends Command[Nothing]
- Java
-
source
interface Command<ReplyMessage> {} public enum Increment implements Command<Void> { INSTANCE } public static class IncrementBy implements Command<Void> { public final int value; public IncrementBy(int value) { this.value = value; } } public static class GetValue implements Command<State> { private final ActorRef<Integer> replyTo; public GetValue(ActorRef<Integer> replyTo) { this.replyTo = replyTo; } }
State is a storage for the latest value of the counter.
- Scala
-
source
final case class State(value: Int) extends CborSerializable
- Java
-
source
public static class State { private final int value; public State(int value) { this.value = value; } public int get() { return value; } }
The command handler handles the commands Increment
, IncrementBy
and GetValue
.
Increment
increments the counter by1
and persists the updated value as an effect in the StateIncrementBy
increments the counter by the value passed to it and persists the updated value as an effect in the StateGetValue
retrieves the value of the counter from the State and replies with it to the actor passed in
- Scala
-
source
import pekko.persistence.typed.state.scaladsl.Effect val commandHandler: (State, Command[_]) => Effect[State] = (state, command) => command match { case Increment => Effect.persist(state.copy(value = state.value + 1)) case IncrementBy(by) => Effect.persist(state.copy(value = state.value + by)) case GetValue(replyTo) => Effect.reply(replyTo)(state) case Delete => Effect.delete[State]() }
- Java
-
source
@Override public CommandHandler<Command<?>, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand( Increment.class, (state, command) -> Effect().persist(new State(state.get() + 1))) .onCommand( IncrementBy.class, (state, command) -> Effect().persist(new State(state.get() + command.value))) .onCommand( GetValue.class, (state, command) -> Effect().reply(command.replyTo, state.get())) .build(); }
These are used to create a DurableStateBehavior
: These are defined in an DurableStateBehavior
:
- Scala
-
source
import org.apache.pekko import pekko.persistence.typed.state.scaladsl.DurableStateBehavior import pekko.persistence.typed.PersistenceId def counter(id: String): DurableStateBehavior[Command[_], State] = { DurableStateBehavior.apply[Command[_], State]( persistenceId = PersistenceId.ofUniqueId(id), emptyState = State(0), commandHandler = commandHandler) }
- Java
-
source
import org.apache.pekko.persistence.typed.state.javadsl.DurableStateBehavior; import org.apache.pekko.persistence.typed.PersistenceId; public class MyPersistentCounter extends DurableStateBehavior<MyPersistentCounter.Command<?>, MyPersistentCounter.State> { // commands, events and state defined here public static Behavior<Command<?>> create(PersistenceId persistenceId) { return new MyPersistentCounter(persistenceId); } private MyPersistentCounter(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { return new State(0); } @Override public CommandHandler<Command<?>, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand( Increment.class, (state, command) -> Effect().persist(new State(state.get() + 1))) .onCommand( IncrementBy.class, (state, command) -> Effect().persist(new State(state.get() + command.value))) .onCommand( GetValue.class, (state, command) -> Effect().reply(command.replyTo, state.get())) .build(); } }
Effects and Side Effects
A command handler returns an Effect
directive that defines what state, if any, to persist. Effects are created using a factory that is returned via the Effect()
method the Effect
factory and can be one of:
persist
will persist the latest state. If it’s a new persistence id, the record will be inserted. In case of an existing persistence id, the record will be updated only if the revision number of the incoming record is 1 more than the already existing record. Otherwisepersist
will fail.delete
will delete the state by setting it to the empty state and the revision number will be incremented by 1.none
no state to be persisted, for example a read-only commandunhandled
the command is unhandled (not supported) in current statestop
stop this actorstash
the current command is stashedunstashAll
process the commands that were stashed withEffect.stash
Effect().stash
reply
send a reply message to the givenActorRef
Note that only one of those can be chosen per incoming command. It is not possible to both persist and say none/unhandled.
In addition to returning the primary Effect
for the command DurableStateBehavior
s can also chain side effects that are to be performed after successful persist which is achieved with the thenRun
function that runs the callback passed to it e.g. Effect.persist(..).thenRun
Effect().persist(..).thenRun
.
All thenRun
registered callbacks are executed sequentially after successful execution of the persist statement (or immediately, in case of none
and unhandled
).
In addition to thenRun
the following actions can also be performed after successful persist:
thenStop
the actor will be stoppedthenUnstashAll
process the commands that were stashed withEffect.stash
Effect().stash
thenReply
send a reply message to the givenActorRef
In the example below, we use a different constructor of DurableStateBehavior.withEnforcedReplies
, which creates a Behavior
for a persistent actor that ensures that every command sends a reply back. Hence it will be a compilation error if the returned effect from a CommandHandler
isn’t a ReplyEffect
.
Instead of Increment
we will have a new command IncrementWithConfirmation
that, along with persistence will also send an acknowledgement as a reply to the ActorRef
passed in the command.
Example of effects and side-effects:
- Scala
-
source
sealed trait Command[ReplyMessage] extends CborSerializable final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done] final case class GetValue(replyTo: ActorRef[State]) extends Command[State] final case class State(value: Int) extends CborSerializable def counter(persistenceId: PersistenceId): DurableStateBehavior[Command[_], State] = { DurableStateBehavior.withEnforcedReplies[Command[_], State]( persistenceId, emptyState = State(0), commandHandler = (state, command) => command match { case IncrementWithConfirmation(replyTo) => Effect.persist(state.copy(value = state.value + 1)).thenReply(replyTo)(_ => Done) case GetValue(replyTo) => Effect.reply(replyTo)(state) }) }
- Java
-
source
import org.apache.pekko.Done; interface Command<ReplyMessage> {} public static class IncrementWithConfirmation implements Command<Void> { public final ActorRef<Done> replyTo; public IncrementWithConfirmation(ActorRef<Done> replyTo) { this.replyTo = replyTo; } } public static class GetValue implements Command<State> { private final ActorRef<Integer> replyTo; public GetValue(ActorRef<Integer> replyTo) { this.replyTo = replyTo; } } public static class State { private final int value; public State(int value) { this.value = value; } public int get() { return value; } } public static Behavior<Command<?>> create(PersistenceId persistenceId) { return new MyPersistentCounterWithReplies(persistenceId); } private MyPersistentCounterWithReplies(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { return new State(0); } @Override public CommandHandler<Command<?>, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand( IncrementWithConfirmation.class, (state, command) -> Effect() .persist(new State(state.get() + 1)) .thenReply(command.replyTo, (st) -> Done.getInstance())) .onCommand( GetValue.class, (state, command) -> Effect().reply(command.replyTo, state.get())) .build(); }
The most common way to have a side-effect is to use the thenRun
method on Effect
. In case you have multiple side-effects that needs to be run for several commands, you can factor them out into functions and reuse for all the commands. For example:
- Scala
-
source
// Example factoring out a chained effect to use in several places with `thenRun` val commonChainedEffects: Mood => Unit = _ => println("Command processed") // Then in a command handler: Effect .persist(Remembered("Yep")) // persist event .thenRun(commonChainedEffects) // add on common chained effect
- Java
-
source
// Example factoring out a chained effect to use in several places with `thenRun` static final Procedure<ExampleState> commonChainedEffect = state -> System.out.println("Command handled!"); @Override public CommandHandler<MyCommand, MyEvent, ExampleState> commandHandler() { return newCommandHandlerBuilder() .forStateType(ExampleState.class) .onCommand( Cmd.class, (state, cmd) -> Effect() .persist(new Evt(cmd.data)) .thenRun(() -> cmd.replyTo.tell(new Ack())) .thenRun(commonChainedEffect)) .build(); }
Side effects ordering and guarantees
Any side effects are executed on an at-most-once basis and will not be executed if the persist fails.
Side effects are not run when the actor is restarted or started again after being stopped.
The side effects are executed sequentially, it is not possible to execute side effects in parallel, unless they call out to something that is running concurrently (for example sending a message to another actor).
It’s possible to execute a side effect before persisting the state, but that can result in that the side effect is performed but that the state is not stored if the persist fails.
Cluster Sharding and DurableStateBehavior
Cluster Sharding is an excellent fit to spread persistent actors over a cluster, addressing them by id. It makes it possible to have more persistent actors exist in the cluster than what would fit in the memory of one node. Cluster sharding improves the resilience of the cluster. If a node crashes, the persistent actors are quickly started on a new node and can resume operations.
The DurableStateBehavior
can then be run as any plain actor as described in actors documentation, but since Pekko Persistence is based on the single-writer principle, the persistent actors are typically used together with Cluster Sharding. For a particular persistenceId
only one persistent actor instance should be active at one time. Cluster Sharding ensures that there is only one active entity (or actor instance) for each id.
Accessing the ActorContext
If the DurableStateBehavior
DurableStateBehavior
needs to use the ActorContext
ActorContext
, for example to spawn child actors, it can be obtained by wrapping construction with Behaviors.setup
:
- Scala
-
source
import org.apache.pekko import pekko.persistence.typed.state.scaladsl.Effect import pekko.persistence.typed.state.scaladsl.DurableStateBehavior.CommandHandler def apply(): Behavior[String] = Behaviors.setup { context => DurableStateBehavior[String, State]( persistenceId = PersistenceId.ofUniqueId("myPersistenceId"), emptyState = State(0), commandHandler = CommandHandler.command { cmd => context.log.info("Got command {}", cmd) Effect.none }) }
- Java
-
source
public class MyPersistentBehavior extends DurableStateBehavior<MyPersistentBehavior.Command, MyPersistentBehavior.State> { public static Behavior<Command> create(PersistenceId persistenceId) { return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx)); } // this makes the context available to the command handler etc. private final ActorContext<Command> context; // optionally if you only need `ActorContext.getSelf()` private final ActorRef<Command> self; public MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> ctx) { super(persistenceId); this.context = ctx; this.self = ctx.getSelf(); } }
Changing Behavior
After processing a message, actors are able to return the Behavior
that is used for the next message.
As you can see in the above examples this is not supported by persistent actors. Instead, the state is persisted as an Effect
by the commandHandler
.
The reason a new behavior can’t be returned is that behavior is part of the actor’s state and must also carefully be reconstructed during recovery from the persisted state. This would imply that the state needs to be encoded such that the behavior can also be restored from it. That would be very prone to mistakes which is why it is not allowed in Pekko Persistence.
For basic actors you can use the same set of command handlers independent of what state the entity is in. For more complex actors it’s useful to be able to change the behavior in the sense that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities.
The next example demonstrates how to define different behavior based on the current State
. It shows an actor that represents the state of a blog post. Before a post is started the only command it can process is to AddPost
. Once it is started then one can look it up with GetPost
, modify it with ChangeBody
or publish it with Publish
.
The state is captured by:
- Scala
-
source
sealed trait State case object BlankState extends State final case class DraftState(content: PostContent) extends State { def withBody(newBody: String): DraftState = copy(content = content.copy(body = newBody)) def postId: String = content.postId } final case class PublishedState(content: PostContent) extends State { def postId: String = content.postId }
- Java
-
source
interface State {} enum BlankState implements State { INSTANCE } static class DraftState implements State { final PostContent content; DraftState(PostContent content) { this.content = content; } DraftState withContent(PostContent newContent) { return new DraftState(newContent); } DraftState withBody(String newBody) { return withContent(new PostContent(postId(), content.title, newBody)); } String postId() { return content.postId; } } static class PublishedState implements State { final PostContent content; PublishedState(PostContent content) { this.content = content; } PublishedState withContent(PostContent newContent) { return new PublishedState(newContent); } PublishedState withBody(String newBody) { return withContent(new PostContent(postId(), content.title, newBody)); } String postId() { return content.postId; } }
The commands, of which only a subset are valid depending on the state:
- Scala
-
source
sealed trait Command final case class AddPost(content: PostContent, replyTo: ActorRef[StatusReply[AddPostDone]]) extends Command final case class AddPostDone(postId: String) final case class GetPost(replyTo: ActorRef[PostContent]) extends Command final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends Command final case class Publish(replyTo: ActorRef[Done]) extends Command final case class PostContent(postId: String, title: String, body: String)
- Java
-
source
public interface Command {} public static class AddPost implements Command { final PostContent content; final ActorRef<AddPostDone> replyTo; public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) { this.content = content; this.replyTo = replyTo; } } public static class AddPostDone implements Command { final String postId; public AddPostDone(String postId) { this.postId = postId; } } public static class GetPost implements Command { final ActorRef<PostContent> replyTo; public GetPost(ActorRef<PostContent> replyTo) { this.replyTo = replyTo; } } public static class ChangeBody implements Command { final String newBody; final ActorRef<Done> replyTo; public ChangeBody(String newBody, ActorRef<Done> replyTo) { this.newBody = newBody; this.replyTo = replyTo; } } public static class Publish implements Command { final ActorRef<Done> replyTo; public Publish(ActorRef<Done> replyTo) { this.replyTo = replyTo; } } public static class PostContent implements Command { final String postId; final String title; final String body; public PostContent(String postId, String title, String body) { this.postId = postId; this.title = title; this.body = body; } }
The command handler to process each command is decided by the state class (or state predicate) that is given to the forStateType
of the CommandHandlerBuilder
and the match cases in the builders. The command handler to process each command is decided by first looking at the state and then the command. It typically becomes two levels of pattern matching, first on the state and then on the command. Delegating to methods like addPost
, changeBody
, publish
etc. is a good practice because the one-line cases give a nice overview of the message dispatch.
- Scala
-
source
private val commandHandler: (State, Command) => Effect[State] = { (state, command) => state match { case BlankState => command match { case cmd: AddPost => addPost(cmd) case _ => Effect.unhandled } case draftState: DraftState => command match { case cmd: ChangeBody => changeBody(draftState, cmd) case Publish(replyTo) => publish(draftState, replyTo) case GetPost(replyTo) => getPost(draftState, replyTo) case AddPost(_, replyTo) => Effect.unhandled[State].thenRun(_ => replyTo ! StatusReply.Error("Cannot add post while in draft state")) } case publishedState: PublishedState => command match { case GetPost(replyTo) => getPost(publishedState, replyTo) case AddPost(_, replyTo) => Effect.unhandled[State].thenRun(_ => replyTo ! StatusReply.Error("Cannot add post, already published")) case _ => Effect.unhandled } } } private def addPost(cmd: AddPost): Effect[State] = { Effect.persist(DraftState(cmd.content)).thenRun { _ => // After persist is done additional side effects can be performed cmd.replyTo ! StatusReply.Success(AddPostDone(cmd.content.postId)) } } private def changeBody(state: DraftState, cmd: ChangeBody): Effect[State] = { Effect.persist(state.withBody(cmd.newBody)).thenRun { _ => cmd.replyTo ! Done } } private def publish(state: DraftState, replyTo: ActorRef[Done]): Effect[State] = { Effect.persist(PublishedState(state.content)).thenRun { _ => println(s"Blog post ${state.postId} was published") replyTo ! Done } } private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): Effect[State] = { replyTo ! state.content Effect.none } private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): Effect[State] = { replyTo ! state.content Effect.none }
- Java
-
source
@Override public CommandHandler<Command, State> commandHandler() { CommandHandlerBuilder<Command, State> builder = newCommandHandlerBuilder(); builder.forStateType(BlankState.class).onCommand(AddPost.class, this::onAddPost); builder .forStateType(DraftState.class) .onCommand(ChangeBody.class, this::onChangeBody) .onCommand(Publish.class, this::onPublish) .onCommand(GetPost.class, this::onGetPost); builder .forStateType(PublishedState.class) .onCommand(ChangeBody.class, this::onChangeBody) .onCommand(GetPost.class, this::onGetPost); builder.forAnyState().onCommand(AddPost.class, (state, cmd) -> Effect().unhandled()); return builder.build(); } private Effect<State> onAddPost(AddPost cmd) { return Effect() .persist(new DraftState(cmd.content)) .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); } private Effect<State> onChangeBody(DraftState state, ChangeBody cmd) { return Effect() .persist(state.withBody(cmd.newBody)) .thenRun(() -> cmd.replyTo.tell(Done.getInstance())); } private Effect<State> onChangeBody(PublishedState state, ChangeBody cmd) { return Effect() .persist(state.withBody(cmd.newBody)) .thenRun(() -> cmd.replyTo.tell(Done.getInstance())); } private Effect<State> onPublish(DraftState state, Publish cmd) { return Effect() .persist(new PublishedState(state.content)) .thenRun( () -> { System.out.println("Blog post published: " + state.postId()); cmd.replyTo.tell(Done.getInstance()); }); } private Effect<State> onGetPost(DraftState state, GetPost cmd) { cmd.replyTo.tell(state.content); return Effect().none(); } private Effect<State> onGetPost(PublishedState state, GetPost cmd) { cmd.replyTo.tell(state.content); return Effect().none(); }
And finally the behavior is created from the DurableStateBehavior.apply
:
- Scala
-
source
object BlogPostEntityDurableState { // commands, state defined here def apply(entityId: String, persistenceId: PersistenceId): Behavior[Command] = { Behaviors.setup { context => context.log.info("Starting BlogPostEntityDurableState {}", entityId) DurableStateBehavior[Command, State](persistenceId, emptyState = BlankState, commandHandler) } } // commandHandler defined here }
- Java
-
source
public class BlogPostEntityDurableState extends DurableStateBehavior< BlogPostEntityDurableState.Command, BlogPostEntityDurableState.State> { // commands and state as in above snippets public static Behavior<Command> create(String entityId, PersistenceId persistenceId) { return Behaviors.setup( context -> { context.getLog().info("Starting BlogPostEntityDurableState {}", entityId); return new BlogPostEntityDurableState(persistenceId); }); } private BlogPostEntityDurableState(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { return BlankState.INSTANCE; } // commandHandler, eventHandler as in above snippets }
This can be refactored one or two steps further by defining the command handlers in the state class as illustrated in command handlers in the state.
There is also an example illustrating an optional initial state.
Replies
The Request-Response interaction pattern is very common for persistent actors, because you typically want to know if the command was rejected due to validation errors and when accepted you want a confirmation when the events have been successfully stored.
Therefore you typically include a ActorRef[ReplyMessageType]
ActorRef<ReplyMessageType>
. If the command can either have a successful response or a validation error returned, the generic response type StatusReply[ReplyType]]
StatusReply<ReplyType>
can be used. If the successful reply does not contain a value but is more of an acknowledgement a pre defined StatusReply.Ack
StatusReply.ack()
of type StatusReply[Done]
StatusReply<Done>
can be used.
After validation errors or after persisting events, using a thenRun
side effect, the reply message can be sent to the ActorRef
.
- Scala
-
source
final case class AddPost(content: PostContent, replyTo: ActorRef[StatusReply[AddPostDone]]) extends Command final case class AddPostDone(postId: String)
- Java
-
source
public static class AddPost implements Command { final PostContent content; final ActorRef<AddPostDone> replyTo; public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) { this.content = content; this.replyTo = replyTo; } } public static class AddPostDone implements Command { final String postId; public AddPostDone(String postId) { this.postId = postId; } }
- Scala
-
source
Effect.persist(DraftState(cmd.content)).thenRun { _ => // After persist is done additional side effects can be performed cmd.replyTo ! StatusReply.Success(AddPostDone(cmd.content.postId)) }
- Java
-
source
return Effect() .persist(new DraftState(cmd.content)) .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that it can be used to enforce that you do not forget to specify replies when implementing the DurableStateBehavior
. If it’s defined with DurableStateBehavior.withEnforcedReplies
DurableStateBehaviorWithEnforcedReplies
there will be compilation errors if the returned effect isn’t a ReplyEffect
, which can be created with Effect.reply
Effect().reply
, Effect.noReply
Effect().noReply
, Effect.thenReply
Effect().thenReply
, or Effect.thenNoReply
Effect().thenNoReply
.
- Scala
-
source
def apply(persistenceId: PersistenceId): Behavior[Command] = { DurableStateBehavior .withEnforcedReplies[Command, Account](persistenceId, EmptyAccount, (state, cmd) => state.applyCommand(cmd)) }
- Java
-
source
public class AccountEntity extends DurableStateBehaviorWithEnforcedReplies< AccountEntity.Command, AccountEntity.Account> {
The commands must have a field of ActorRef[ReplyMessageType]
ActorRef<ReplyMessageType>
that can then be used to send a reply.
- Scala
-
source
sealed trait Command extends CborSerializable final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
- Java
-
source
interface Command extends CborSerializable {}
The ReplyEffect
is created with Effect.reply
Effect().reply
, Effect.noReply
Effect().noReply
, Effect.thenReply
Effect().thenReply
, or Effect.thenNoReply
Effect().thenNoReply
.
Note that command handlers are defined with newCommandHandlerWithReplyBuilder
when using EventSourcedBehaviorWithEnforcedReplies
, as opposed to newCommandHandlerBuilder when using EventSourcedBehavior
.
- Scala
-
source
private def deposit(cmd: Deposit) = { Effect.persist(copy(balance = balance + cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack) } private def withdraw(cmd: Withdraw) = { if (canWithdraw(cmd.amount)) Effect.persist(copy(balance = balance - cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack) else Effect.reply(cmd.replyTo)( StatusReply.Error(s"Insufficient balance $balance to be able to withdraw ${cmd.amount}")) }
- Java
-
source
private ReplyEffect<Account> withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() .reply( command.replyTo, StatusReply.error("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(account.makeWithdraw(command.amount)) .thenReply(command.replyTo, account2 -> StatusReply.ack()); } }
These effects will send the reply message even when DurableStateBehavior.withEnforcedReplies
DurableStateBehaviorWithEnforcedReplies
is not used, but then there will be no compilation errors if the reply decision is left out.
Note that the noReply
is a way of making a conscious decision that a reply shouldn’t be sent for a specific command or that a reply will be sent later, perhaps after some asynchronous interaction with other actors or services.
Serialization
The same serialization mechanism as for actor messages is also used for persistent actors.
You need to enable serialization for your commands (messages) and state. Serialization with Jackson is a good choice in many cases and our recommendation if you don’t have other preference.
Tagging
Persistence allows you to use tags in persistence query. Tagging allows you to identify a subset of states in the durable store and separately consume them as a stream through the DurableStateStoreQuery
interface.
- Scala
-
source
DurableStateBehavior[Command[_], State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(0), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect")) .withTag("tag1")
- Java
-
source
public class MyPersistentBehavior extends DurableStateBehavior<MyPersistentBehavior.Command, MyPersistentBehavior.State> { @Override public String tag() { return "tag1"; }
Wrapping DurableStateBehavior
When creating a DurableStateBehavior
, it is possible to wrap DurableStateBehavior
in other behaviors such as Behaviors.setup
in order to access the ActorContext
object. For instance to access the logger from within the ActorContext
to log for debugging the commandHandler
.
- Scala
-
source
Behaviors.setup[Command[_]] { context => DurableStateBehavior[Command[_], State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(0), commandHandler = CommandHandler.command { cmd => context.log.info("Got command {}", cmd) Effect.none }) }
- Java
-
source
public class MyPersistentBehavior extends DurableStateBehavior<MyPersistentBehavior.Command, MyPersistentBehavior.State> { public static Behavior<Command> create(PersistenceId persistenceId) { return Behaviors.setup(context -> new MyPersistentBehavior(persistenceId, context)); } private final ActorContext<Command> context; private MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> context) { super( persistenceId, SupervisorStrategy.restartWithBackoff( Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2)); this.context = context; } @Override public CommandHandler<Command, State> commandHandler() { return (state, command) -> { context.getLog().info("In command handler"); return Effect().none(); }; }