Style Guide
Command handlers in the state¶
We can take the previous bank account example one step further by handling the commands within the state as well.
sourceobject AccountEntity {
// Command
sealed trait Command extends CborSerializable
final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
// Reply
final case class CurrentBalance(balance: BigDecimal)
val Zero = BigDecimal(0)
// type alias to reduce boilerplate
type ReplyEffect = pekko.persistence.typed.state.scaladsl.ReplyEffect[Account]
// State
sealed trait Account extends CborSerializable {
def applyCommand(cmd: Command): ReplyEffect
}
case object EmptyAccount extends Account {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case CreateAccount(replyTo) =>
Effect.persist(OpenedAccount(Zero)).thenReply(replyTo)(_ => StatusReply.Ack)
case _ =>
// CreateAccount before handling any other commands
Effect.unhandled.thenNoReply()
}
}
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case cmd @ Deposit(_, _) => deposit(cmd)
case cmd @ Withdraw(_, _) => withdraw(cmd)
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(balance))
case CloseAccount(replyTo) =>
if (balance == Zero)
Effect.persist(ClosedAccount).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(StatusReply.Error("Account is already created"))
}
private def canWithdraw(amount: BigDecimal): Boolean = {
balance - amount >= Zero
}
private def deposit(cmd: Deposit) = {
Effect.persist(copy(balance = balance + cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
}
private def withdraw(cmd: Withdraw) = {
if (canWithdraw(cmd.amount))
Effect.persist(copy(balance = balance - cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
else
Effect.reply(cmd.replyTo)(
StatusReply.Error(s"Insufficient balance $balance to be able to withdraw ${cmd.amount}"))
}
}
case object ClosedAccount extends Account {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case c: Deposit =>
replyClosed(c.replyTo)
case c: Withdraw =>
replyClosed(c.replyTo)
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(Zero))
case CloseAccount(replyTo) =>
replyClosed(replyTo)
case CreateAccount(replyTo) =>
replyClosed(replyTo)
}
private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect =
Effect.reply(replyTo)(StatusReply.Error(s"Account is closed"))
}
// when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`:
val TypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("Account")
def apply(persistenceId: PersistenceId): Behavior[Command] = {
DurableStateBehavior
.withEnforcedReplies[Command, Account](persistenceId, EmptyAccount, (state, cmd) => state.applyCommand(cmd))
}
}
Take note of how the command handler is delegating to applyCommand
in the Account
(state), which is implemented in the concrete EmptyAccount
, OpenedAccount
, and ClosedAccount
.
Optional initial state¶
Sometimes, it’s not desirable to use a separate state class for the empty initial state, but rather act as if there is no state yet. Option[State]
can be used as the state type and None
as the emptyState
. Then pattern matching is used in command handlers at the outer layer before delegating to the state or other methods.
sourceobject AccountEntity {
// Command
sealed trait Command extends CborSerializable
final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
// Reply
final case class CurrentBalance(balance: BigDecimal) extends CborSerializable
val Zero = BigDecimal(0)
// type alias to reduce boilerplate
type ReplyEffect = pekko.persistence.typed.state.scaladsl.ReplyEffect[Option[Account]]
// State
sealed trait Account extends CborSerializable {
def applyCommand(cmd: Command): ReplyEffect
}
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case Deposit(amount, replyTo) =>
Effect.persist(Some(copy(balance = balance + amount))).thenReply(replyTo)(_ => StatusReply.Ack)
case Withdraw(amount, replyTo) =>
if (canWithdraw(amount))
Effect.persist(Some(copy(balance = balance - amount))).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(StatusReply.Error(s"Insufficient balance $balance to be able to withdraw $amount"))
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(balance))
case CloseAccount(replyTo) =>
if (balance == Zero)
Effect.persist(Some(ClosedAccount)).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(StatusReply.Error("Account is already created"))
}
def canWithdraw(amount: BigDecimal): Boolean = {
balance - amount >= Zero
}
}
case object ClosedAccount extends Account {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case c: Deposit =>
replyClosed(c.replyTo)
case c: Withdraw =>
replyClosed(c.replyTo)
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(Zero))
case CloseAccount(replyTo) =>
replyClosed(replyTo)
case CreateAccount(replyTo) =>
replyClosed(replyTo)
}
private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect =
Effect.reply(replyTo)(StatusReply.Error(s"Account is closed"))
}
// when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`:
val TypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("Account")
def apply(persistenceId: PersistenceId): Behavior[Command] = {
DurableStateBehavior.withEnforcedReplies[Command, Option[Account]](
persistenceId,
None,
(state, cmd) =>
state match {
case None => onFirstCommand(cmd)
case Some(account) => account.applyCommand(cmd)
})
}
def onFirstCommand(cmd: Command): ReplyEffect = {
cmd match {
case CreateAccount(replyTo) =>
Effect.persist(Some(OpenedAccount(Zero))).thenReply(replyTo)(_ => StatusReply.Ack)
case _ =>
// CreateAccount before handling any other commands
Effect.unhandled.thenNoReply()
}
}
}
sourcepublic class AccountEntity
extends DurableStateBehaviorWithEnforcedReplies<
AccountEntity.Command, AccountEntity.Account> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "Account");
// Command
interface Command extends CborSerializable {}
public static class CreateAccount implements Command {
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CreateAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
public static class Deposit implements Command {
public final BigDecimal amount;
public final ActorRef<StatusReply<Done>> replyTo;
public Deposit(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
this.amount = amount;
}
}
public static class Withdraw implements Command {
public final BigDecimal amount;
public final ActorRef<StatusReply<Done>> replyTo;
public Withdraw(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.amount = amount;
this.replyTo = replyTo;
}
}
public static class GetBalance implements Command {
public final ActorRef<CurrentBalance> replyTo;
@JsonCreator
public GetBalance(ActorRef<CurrentBalance> replyTo) {
this.replyTo = replyTo;
}
}
public static class CloseAccount implements Command {
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CloseAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
// Reply
public static class CurrentBalance implements CborSerializable {
public final BigDecimal balance;
@JsonCreator
public CurrentBalance(BigDecimal balance) {
this.balance = balance;
}
}
// State
interface Account extends CborSerializable {}
public static class OpenedAccount implements Account {
public final BigDecimal balance;
public OpenedAccount() {
this.balance = BigDecimal.ZERO;
}
@JsonCreator
public OpenedAccount(BigDecimal balance) {
this.balance = balance;
}
OpenedAccount makeDeposit(BigDecimal amount) {
return new OpenedAccount(balance.add(amount));
}
boolean canWithdraw(BigDecimal amount) {
return (balance.subtract(amount).compareTo(BigDecimal.ZERO) >= 0);
}
OpenedAccount makeWithdraw(BigDecimal amount) {
if (!canWithdraw(amount))
throw new IllegalStateException("Account balance can't be negative");
return new OpenedAccount(balance.subtract(amount));
}
ClosedAccount closedAccount() {
return new ClosedAccount();
}
}
public static class ClosedAccount implements Account {}
public static AccountEntity create(String accountNumber, PersistenceId persistenceId) {
return new AccountEntity(accountNumber, persistenceId);
}
private final String accountNumber;
private AccountEntity(String accountNumber, PersistenceId persistenceId) {
super(persistenceId);
this.accountNumber = accountNumber;
}
@Override
public Account emptyState() {
return null;
}
@Override
public CommandHandlerWithReply<Command, Account> commandHandler() {
CommandHandlerWithReplyBuilder<Command, Account> builder =
newCommandHandlerWithReplyBuilder();
builder.forNullState().onCommand(CreateAccount.class, this::createAccount);
builder
.forStateType(OpenedAccount.class)
.onCommand(Deposit.class, this::deposit)
.onCommand(Withdraw.class, this::withdraw)
.onCommand(GetBalance.class, this::getBalance)
.onCommand(CloseAccount.class, this::closeAccount);
builder
.forStateType(ClosedAccount.class)
.onAnyCommand(() -> Effect().unhandled().thenNoReply());
return builder.build();
}
private ReplyEffect<Account> createAccount(CreateAccount command) {
return Effect()
.persist(new OpenedAccount())
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(account.makeDeposit(command.amount))
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(
command.replyTo,
StatusReply.error("not enough funds to withdraw " + command.amount));
} else {
return Effect()
.persist(account.makeWithdraw(command.amount))
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
}
private ReplyEffect<Account> getBalance(OpenedAccount account, GetBalance command) {
return Effect().reply(command.replyTo, new CurrentBalance(account.balance));
}
private ReplyEffect<Account> closeAccount(OpenedAccount account, CloseAccount command) {
if (account.balance.equals(BigDecimal.ZERO)) {
return Effect()
.persist(account.closedAccount())
.thenReply(command.replyTo, account2 -> StatusReply.ack());
} else {
return Effect()
.reply(command.replyTo, StatusReply.error("balance must be zero for closing account"));
}
}
}
1.0.3