Stash
You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Actors.
Dependency
To use Pekko Actor Typed, you must add the following dependency in your project:
- sbt
val PekkoVersion = "1.0.3" libraryDependencies += "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.3") implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}" }
Introduction
Stashing enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior.
A typical example when this is useful is if the actor has to load some initial state or initialize some resources before it can accept the first real message. Another example is when the actor is waiting for something to complete before processing the next message.
Let’s illustrate these two with an example. The DataAccess
actor below is used like a single access point to a value stored in a database. When it’s started it loads current state from the database, and while waiting for that initial value all incoming messages are stashed.
When a new state is saved in the database it also stashes incoming messages to make the processing sequential, one after the other without multiple pending writes.
- Scala
-
source
import scala.concurrent.Future import scala.util.Failure import scala.util.Success import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorRef import pekko.actor.typed.Behavior import pekko.actor.typed.scaladsl.ActorContext import pekko.actor.typed.scaladsl.Behaviors import pekko.actor.typed.scaladsl.StashBuffer trait DB { def save(id: String, value: String): Future[Done] def load(id: String): Future[String] } object DataAccess { sealed trait Command final case class Save(value: String, replyTo: ActorRef[Done]) extends Command final case class Get(replyTo: ActorRef[String]) extends Command private final case class InitialState(value: String) extends Command private case object SaveSuccess extends Command private final case class DBError(cause: Throwable) extends Command def apply(id: String, db: DB): Behavior[Command] = { Behaviors.withStash(100) { buffer => Behaviors.setup[Command] { context => new DataAccess(context, buffer, id, db).start() } } } } class DataAccess( context: ActorContext[DataAccess.Command], buffer: StashBuffer[DataAccess.Command], id: String, db: DB) { import DataAccess._ private def start(): Behavior[Command] = { context.pipeToSelf(db.load(id)) { case Success(value) => InitialState(value) case Failure(cause) => DBError(cause) } Behaviors.receiveMessage { case InitialState(value) => // now we are ready to handle stashed messages if any buffer.unstashAll(active(value)) case DBError(cause) => throw cause case other => // stash all other messages for later processing buffer.stash(other) Behaviors.same } } private def active(state: String): Behavior[Command] = { Behaviors.receiveMessagePartial { case Get(replyTo) => replyTo ! state Behaviors.same case Save(value, replyTo) => context.pipeToSelf(db.save(id, value)) { case Success(_) => SaveSuccess case Failure(cause) => DBError(cause) } saving(value, replyTo) } } private def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = { Behaviors.receiveMessage { case SaveSuccess => replyTo ! Done buffer.unstashAll(active(state)) case DBError(cause) => throw cause case other => buffer.stash(other) Behaviors.same } } }
- Java
-
source
import org.apache.pekko.Done; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.actor.typed.javadsl.StashBuffer; import java.util.concurrent.CompletionStage; interface DB { CompletionStage<Done> save(String id, String value); CompletionStage<String> load(String id); } public class DataAccess { public interface Command {} public static class Save implements Command { public final String payload; public final ActorRef<Done> replyTo; public Save(String payload, ActorRef<Done> replyTo) { this.payload = payload; this.replyTo = replyTo; } } public static class Get implements Command { public final ActorRef<String> replyTo; public Get(ActorRef<String> replyTo) { this.replyTo = replyTo; } } private static class InitialState implements Command { public final String value; InitialState(String value) { this.value = value; } } private enum SaveSuccess implements Command { INSTANCE } private static class DBError implements Command { public final RuntimeException cause; DBError(RuntimeException cause) { this.cause = cause; } } private final ActorContext<Command> context; private final StashBuffer<Command> buffer; private final String id; private final DB db; private DataAccess( ActorContext<Command> context, StashBuffer<Command> buffer, String id, DB db) { this.context = context; this.buffer = buffer; this.id = id; this.db = db; } public static Behavior<Command> create(String id, DB db) { return Behaviors.withStash( 100, stash -> Behaviors.setup( ctx -> { ctx.pipeToSelf( db.load(id), (value, cause) -> { if (cause == null) return new InitialState(value); else return new DBError(asRuntimeException(cause)); }); return new DataAccess(ctx, stash, id, db).start(); })); } private Behavior<Command> start() { return Behaviors.receive(Command.class) .onMessage(InitialState.class, this::onInitialState) .onMessage(DBError.class, this::onDBError) .onMessage(Command.class, this::stashOtherCommand) .build(); } private Behavior<Command> onInitialState(InitialState message) { // now we are ready to handle stashed messages if any return buffer.unstashAll(active(message.value)); } private Behavior<Command> onDBError(DBError message) { throw message.cause; } private Behavior<Command> stashOtherCommand(Command message) { // stash all other messages for later processing buffer.stash(message); return Behaviors.same(); } private Behavior<Command> active(String state) { return Behaviors.receive(Command.class) .onMessage(Get.class, message -> onGet(state, message)) .onMessage(Save.class, this::onSave) .build(); } private Behavior<Command> onGet(String state, Get message) { message.replyTo.tell(state); return Behaviors.same(); } private Behavior<Command> onSave(Save message) { context.pipeToSelf( db.save(id, message.payload), (value, cause) -> { if (cause == null) return SaveSuccess.INSTANCE; else return new DBError(asRuntimeException(cause)); }); return saving(message.payload, message.replyTo); } private Behavior<Command> saving(String state, ActorRef<Done> replyTo) { return Behaviors.receive(Command.class) .onMessage(SaveSuccess.class, message -> onSaveSuccess(state, replyTo)) .onMessage(DBError.class, this::onDBError) .onMessage(Command.class, this::stashOtherCommand) .build(); } private Behavior<Command> onSaveSuccess(String state, ActorRef<Done> replyTo) { replyTo.tell(Done.getInstance()); return buffer.unstashAll(active(state)); } private static RuntimeException asRuntimeException(Throwable t) { // can't throw Throwable in lambdas if (t instanceof RuntimeException) { return (RuntimeException) t; } else { return new RuntimeException(t); } } }
One important thing to be aware of is that the StashBuffer
StashBuffer
is a buffer and stashed messages will be kept in memory until they are unstashed (or the actor is stopped and garbage collected). It’s recommended to avoid stashing too many messages to avoid too much memory usage and even risking OutOfMemoryError
if many actors are stashing many messages. Therefore the StashBuffer
StashBuffer
is bounded and the capacity
of how many messages it can hold must be specified when it’s created.
If you try to stash more messages than the capacity
a StashOverflowException
StashOverflowException
will be thrown. You can use StashBuffer.isFull
StashBuffer.isFull
before stashing a message to avoid that and take other actions, such as dropping the message.
When unstashing the buffered messages by calling unstashAll
unstashAll
the messages will be processed sequentially in the order they were added and all are processed unless an exception is thrown. The actor is unresponsive to other new messages until unstashAll
unstashAll
is completed. That is another reason for keeping the number of stashed messages low. Actors that hog the message processing thread for too long can result in starvation of other actors.
That can be mitigated by using the StashBuffer.unstash
StashBuffer.unstash
with numberOfMessages
parameter and then send a message to context.self
context.getSelf
before continuing unstashing more. That means that other new messages may arrive in-between and those must be stashed to keep the original order of messages. It becomes more complicated, so better keep the number of stashed messages low.