Style Guide
Command handlers in the state
We can take the previous bank account example one step further by handling the commands within the state as well.
- Scala
-
source
object AccountEntity { // Command sealed trait Command extends CborSerializable final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command // Reply final case class CurrentBalance(balance: BigDecimal) val Zero = BigDecimal(0) // type alias to reduce boilerplate type ReplyEffect = pekko.persistence.typed.state.scaladsl.ReplyEffect[Account] // State sealed trait Account extends CborSerializable { def applyCommand(cmd: Command): ReplyEffect } case object EmptyAccount extends Account { override def applyCommand(cmd: Command): ReplyEffect = cmd match { case CreateAccount(replyTo) => Effect.persist(OpenedAccount(Zero)).thenReply(replyTo)(_ => StatusReply.Ack) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() } } case class OpenedAccount(balance: BigDecimal) extends Account { require(balance >= Zero, "Account balance can't be negative") override def applyCommand(cmd: Command): ReplyEffect = cmd match { case cmd @ Deposit(_, _) => deposit(cmd) case cmd @ Withdraw(_, _) => withdraw(cmd) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(balance)) case CloseAccount(replyTo) => if (balance == Zero) Effect.persist(ClosedAccount).thenReply(replyTo)(_ => StatusReply.Ack) else Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance")) case CreateAccount(replyTo) => Effect.reply(replyTo)(StatusReply.Error("Account is already created")) } private def canWithdraw(amount: BigDecimal): Boolean = { balance - amount >= Zero } private def deposit(cmd: Deposit) = { Effect.persist(copy(balance = balance + cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack) } private def withdraw(cmd: Withdraw) = { if (canWithdraw(cmd.amount)) Effect.persist(copy(balance = balance - cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack) else Effect.reply(cmd.replyTo)( StatusReply.Error(s"Insufficient balance $balance to be able to withdraw ${cmd.amount}")) } } case object ClosedAccount extends Account { override def applyCommand(cmd: Command): ReplyEffect = cmd match { case c: Deposit => replyClosed(c.replyTo) case c: Withdraw => replyClosed(c.replyTo) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(Zero)) case CloseAccount(replyTo) => replyClosed(replyTo) case CreateAccount(replyTo) => replyClosed(replyTo) } private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect = Effect.reply(replyTo)(StatusReply.Error(s"Account is closed")) } // when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`: val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("Account") def apply(persistenceId: PersistenceId): Behavior[Command] = { DurableStateBehavior .withEnforcedReplies[Command, Account](persistenceId, EmptyAccount, (state, cmd) => state.applyCommand(cmd)) } }
Take note of how the command handler is delegating to applyCommand
in the Account
(state), which is implemented in the concrete EmptyAccount
, OpenedAccount
, and ClosedAccount
.
Optional initial state
Sometimes, it’s not desirable to use a separate state class for the empty initial state, but rather act as if there is no state yet. You can use null
as the emptyState
, but be aware of that the state
parameter will be null
until the first non-null state has been persisted It’s possible to use Optional
instead of null
, but that requires extra boilerplate to unwrap the Optional
state parameter. Therefore use of null
is simpler. The following example illustrates using null
as the emptyState
. Option[State]
can be used as the state type and None
as the emptyState
. Then pattern matching is used in command handlers at the outer layer before delegating to the state or other methods.
- Scala
-
source
object AccountEntity { // Command sealed trait Command extends CborSerializable final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command // Reply final case class CurrentBalance(balance: BigDecimal) extends CborSerializable val Zero = BigDecimal(0) // type alias to reduce boilerplate type ReplyEffect = pekko.persistence.typed.state.scaladsl.ReplyEffect[Option[Account]] // State sealed trait Account extends CborSerializable { def applyCommand(cmd: Command): ReplyEffect } case class OpenedAccount(balance: BigDecimal) extends Account { require(balance >= Zero, "Account balance can't be negative") override def applyCommand(cmd: Command): ReplyEffect = cmd match { case Deposit(amount, replyTo) => Effect.persist(Some(copy(balance = balance + amount))).thenReply(replyTo)(_ => StatusReply.Ack) case Withdraw(amount, replyTo) => if (canWithdraw(amount)) Effect.persist(Some(copy(balance = balance - amount))).thenReply(replyTo)(_ => StatusReply.Ack) else Effect.reply(replyTo)(StatusReply.Error(s"Insufficient balance $balance to be able to withdraw $amount")) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(balance)) case CloseAccount(replyTo) => if (balance == Zero) Effect.persist(Some(ClosedAccount)).thenReply(replyTo)(_ => StatusReply.Ack) else Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance")) case CreateAccount(replyTo) => Effect.reply(replyTo)(StatusReply.Error("Account is already created")) } def canWithdraw(amount: BigDecimal): Boolean = { balance - amount >= Zero } } case object ClosedAccount extends Account { override def applyCommand(cmd: Command): ReplyEffect = cmd match { case c: Deposit => replyClosed(c.replyTo) case c: Withdraw => replyClosed(c.replyTo) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(Zero)) case CloseAccount(replyTo) => replyClosed(replyTo) case CreateAccount(replyTo) => replyClosed(replyTo) } private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect = Effect.reply(replyTo)(StatusReply.Error(s"Account is closed")) } // when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`: val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("Account") def apply(persistenceId: PersistenceId): Behavior[Command] = { DurableStateBehavior.withEnforcedReplies[Command, Option[Account]]( persistenceId, None, (state, cmd) => state match { case None => onFirstCommand(cmd) case Some(account) => account.applyCommand(cmd) }) } def onFirstCommand(cmd: Command): ReplyEffect = { cmd match { case CreateAccount(replyTo) => Effect.persist(Some(OpenedAccount(Zero))).thenReply(replyTo)(_ => StatusReply.Ack) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() } } }
- Java
-
source
public class AccountEntity extends DurableStateBehaviorWithEnforcedReplies< AccountEntity.Command, AccountEntity.Account> { public static final EntityTypeKey<Command> ENTITY_TYPE_KEY = EntityTypeKey.create(Command.class, "Account"); // Command interface Command extends CborSerializable {} public static class CreateAccount implements Command { public final ActorRef<StatusReply<Done>> replyTo; @JsonCreator public CreateAccount(ActorRef<StatusReply<Done>> replyTo) { this.replyTo = replyTo; } } public static class Deposit implements Command { public final BigDecimal amount; public final ActorRef<StatusReply<Done>> replyTo; public Deposit(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) { this.replyTo = replyTo; this.amount = amount; } } public static class Withdraw implements Command { public final BigDecimal amount; public final ActorRef<StatusReply<Done>> replyTo; public Withdraw(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) { this.amount = amount; this.replyTo = replyTo; } } public static class GetBalance implements Command { public final ActorRef<CurrentBalance> replyTo; @JsonCreator public GetBalance(ActorRef<CurrentBalance> replyTo) { this.replyTo = replyTo; } } public static class CloseAccount implements Command { public final ActorRef<StatusReply<Done>> replyTo; @JsonCreator public CloseAccount(ActorRef<StatusReply<Done>> replyTo) { this.replyTo = replyTo; } } // Reply public static class CurrentBalance implements CborSerializable { public final BigDecimal balance; @JsonCreator public CurrentBalance(BigDecimal balance) { this.balance = balance; } } // State interface Account extends CborSerializable {} public static class OpenedAccount implements Account { public final BigDecimal balance; public OpenedAccount() { this.balance = BigDecimal.ZERO; } @JsonCreator public OpenedAccount(BigDecimal balance) { this.balance = balance; } OpenedAccount makeDeposit(BigDecimal amount) { return new OpenedAccount(balance.add(amount)); } boolean canWithdraw(BigDecimal amount) { return (balance.subtract(amount).compareTo(BigDecimal.ZERO) >= 0); } OpenedAccount makeWithdraw(BigDecimal amount) { if (!canWithdraw(amount)) throw new IllegalStateException("Account balance can't be negative"); return new OpenedAccount(balance.subtract(amount)); } ClosedAccount closedAccount() { return new ClosedAccount(); } } public static class ClosedAccount implements Account {} public static AccountEntity create(String accountNumber, PersistenceId persistenceId) { return new AccountEntity(accountNumber, persistenceId); } private final String accountNumber; private AccountEntity(String accountNumber, PersistenceId persistenceId) { super(persistenceId); this.accountNumber = accountNumber; } @Override public Account emptyState() { return null; } @Override public CommandHandlerWithReply<Command, Account> commandHandler() { CommandHandlerWithReplyBuilder<Command, Account> builder = newCommandHandlerWithReplyBuilder(); builder.forNullState().onCommand(CreateAccount.class, this::createAccount); builder .forStateType(OpenedAccount.class) .onCommand(Deposit.class, this::deposit) .onCommand(Withdraw.class, this::withdraw) .onCommand(GetBalance.class, this::getBalance) .onCommand(CloseAccount.class, this::closeAccount); builder .forStateType(ClosedAccount.class) .onAnyCommand(() -> Effect().unhandled().thenNoReply()); return builder.build(); } private ReplyEffect<Account> createAccount(CreateAccount command) { return Effect() .persist(new OpenedAccount()) .thenReply(command.replyTo, account2 -> StatusReply.ack()); } private ReplyEffect<Account> deposit(OpenedAccount account, Deposit command) { return Effect() .persist(account.makeDeposit(command.amount)) .thenReply(command.replyTo, account2 -> StatusReply.ack()); } private ReplyEffect<Account> withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() .reply( command.replyTo, StatusReply.error("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(account.makeWithdraw(command.amount)) .thenReply(command.replyTo, account2 -> StatusReply.ack()); } } private ReplyEffect<Account> getBalance(OpenedAccount account, GetBalance command) { return Effect().reply(command.replyTo, new CurrentBalance(account.balance)); } private ReplyEffect<Account> closeAccount(OpenedAccount account, CloseAccount command) { if (account.balance.equals(BigDecimal.ZERO)) { return Effect() .persist(account.closedAccount()) .thenReply(command.replyTo, account2 -> StatusReply.ack()); } else { return Effect() .reply(command.replyTo, StatusReply.error("balance must be zero for closing account")); } } }