Stash

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Actors.

Dependency

To use Pekko Actor Typed, you must add the following dependency in your project:

sbt
val PekkoVersion = "1.0.1"
libraryDependencies += "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.0.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.1")

  implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}"
}

Introduction

Stashing enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior.

A typical example when this is useful is if the actor has to load some initial state or initialize some resources before it can accept the first real message. Another example is when the actor is waiting for something to complete before processing the next message.

Let’s illustrate these two with an example. The DataAccess actor below is used like a single access point to a value stored in a database. When it’s started it loads current state from the database, and while waiting for that initial value all incoming messages are stashed.

When a new state is saved in the database it also stashes incoming messages to make the processing sequential, one after the other without multiple pending writes.

Scala
sourceimport scala.concurrent.Future
import scala.util.Failure
import scala.util.Success

import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.ActorContext
import pekko.actor.typed.scaladsl.Behaviors
import pekko.actor.typed.scaladsl.StashBuffer

trait DB {
  def save(id: String, value: String): Future[Done]
  def load(id: String): Future[String]
}

object DataAccess {
  sealed trait Command
  final case class Save(value: String, replyTo: ActorRef[Done]) extends Command
  final case class Get(replyTo: ActorRef[String]) extends Command
  private final case class InitialState(value: String) extends Command
  private case object SaveSuccess extends Command
  private final case class DBError(cause: Throwable) extends Command

  def apply(id: String, db: DB): Behavior[Command] = {
    Behaviors.withStash(100) { buffer =>
      Behaviors.setup[Command] { context =>
        new DataAccess(context, buffer, id, db).start()
      }
    }
  }
}

class DataAccess(
    context: ActorContext[DataAccess.Command],
    buffer: StashBuffer[DataAccess.Command],
    id: String,
    db: DB) {
  import DataAccess._

  private def start(): Behavior[Command] = {
    context.pipeToSelf(db.load(id)) {
      case Success(value) => InitialState(value)
      case Failure(cause) => DBError(cause)
    }

    Behaviors.receiveMessage {
      case InitialState(value) =>
        // now we are ready to handle stashed messages if any
        buffer.unstashAll(active(value))
      case DBError(cause) =>
        throw cause
      case other =>
        // stash all other messages for later processing
        buffer.stash(other)
        Behaviors.same
    }
  }

  private def active(state: String): Behavior[Command] = {
    Behaviors.receiveMessagePartial {
      case Get(replyTo) =>
        replyTo ! state
        Behaviors.same
      case Save(value, replyTo) =>
        context.pipeToSelf(db.save(id, value)) {
          case Success(_)     => SaveSuccess
          case Failure(cause) => DBError(cause)
        }
        saving(value, replyTo)
    }
  }

  private def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = {
    Behaviors.receiveMessage {
      case SaveSuccess =>
        replyTo ! Done
        buffer.unstashAll(active(state))
      case DBError(cause) =>
        throw cause
      case other =>
        buffer.stash(other)
        Behaviors.same
    }
  }

}
Java
sourceimport org.apache.pekko.Done;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.StashBuffer;
import java.util.concurrent.CompletionStage;


interface DB {
  CompletionStage<Done> save(String id, String value);

  CompletionStage<String> load(String id);
}

public class DataAccess {

  public interface Command {}

  public static class Save implements Command {
    public final String payload;
    public final ActorRef<Done> replyTo;

    public Save(String payload, ActorRef<Done> replyTo) {
      this.payload = payload;
      this.replyTo = replyTo;
    }
  }

  public static class Get implements Command {
    public final ActorRef<String> replyTo;

    public Get(ActorRef<String> replyTo) {
      this.replyTo = replyTo;
    }
  }

  private static class InitialState implements Command {
    public final String value;

    InitialState(String value) {
      this.value = value;
    }
  }

  private enum SaveSuccess implements Command {
    INSTANCE
  }

  private static class DBError implements Command {
    public final RuntimeException cause;

    DBError(RuntimeException cause) {
      this.cause = cause;
    }
  }

  private final ActorContext<Command> context;
  private final StashBuffer<Command> buffer;
  private final String id;
  private final DB db;

  private DataAccess(
      ActorContext<Command> context, StashBuffer<Command> buffer, String id, DB db) {
    this.context = context;
    this.buffer = buffer;
    this.id = id;
    this.db = db;
  }

  public static Behavior<Command> create(String id, DB db) {
    return Behaviors.withStash(
        100,
        stash ->
            Behaviors.setup(
                ctx -> {
                  ctx.pipeToSelf(
                      db.load(id),
                      (value, cause) -> {
                        if (cause == null) return new InitialState(value);
                        else return new DBError(asRuntimeException(cause));
                      });

                  return new DataAccess(ctx, stash, id, db).start();
                }));
  }

  private Behavior<Command> start() {
    return Behaviors.receive(Command.class)
        .onMessage(InitialState.class, this::onInitialState)
        .onMessage(DBError.class, this::onDBError)
        .onMessage(Command.class, this::stashOtherCommand)
        .build();
  }

  private Behavior<Command> onInitialState(InitialState message) {
    // now we are ready to handle stashed messages if any
    return buffer.unstashAll(active(message.value));
  }

  private Behavior<Command> onDBError(DBError message) {
    throw message.cause;
  }

  private Behavior<Command> stashOtherCommand(Command message) {
    // stash all other messages for later processing
    buffer.stash(message);
    return Behaviors.same();
  }

  private Behavior<Command> active(String state) {
    return Behaviors.receive(Command.class)
        .onMessage(Get.class, message -> onGet(state, message))
        .onMessage(Save.class, this::onSave)
        .build();
  }

  private Behavior<Command> onGet(String state, Get message) {
    message.replyTo.tell(state);
    return Behaviors.same();
  }

  private Behavior<Command> onSave(Save message) {
    context.pipeToSelf(
        db.save(id, message.payload),
        (value, cause) -> {
          if (cause == null) return SaveSuccess.INSTANCE;
          else return new DBError(asRuntimeException(cause));
        });
    return saving(message.payload, message.replyTo);
  }

  private Behavior<Command> saving(String state, ActorRef<Done> replyTo) {
    return Behaviors.receive(Command.class)
        .onMessage(SaveSuccess.class, message -> onSaveSuccess(state, replyTo))
        .onMessage(DBError.class, this::onDBError)
        .onMessage(Command.class, this::stashOtherCommand)
        .build();
  }

  private Behavior<Command> onSaveSuccess(String state, ActorRef<Done> replyTo) {
    replyTo.tell(Done.getInstance());
    return buffer.unstashAll(active(state));
  }

  private static RuntimeException asRuntimeException(Throwable t) {
    // can't throw Throwable in lambdas
    if (t instanceof RuntimeException) {
      return (RuntimeException) t;
    } else {
      return new RuntimeException(t);
    }
  }
}

One important thing to be aware of is that the StashBufferStashBuffer is a buffer and stashed messages will be kept in memory until they are unstashed (or the actor is stopped and garbage collected). It’s recommended to avoid stashing too many messages to avoid too much memory usage and even risking OutOfMemoryError if many actors are stashing many messages. Therefore the StashBufferStashBuffer is bounded and the capacity of how many messages it can hold must be specified when it’s created.

If you try to stash more messages than the capacity a StashOverflowExceptionStashOverflowException will be thrown. You can use StashBuffer.isFullStashBuffer.isFull before stashing a message to avoid that and take other actions, such as dropping the message.

When unstashing the buffered messages by calling unstashAllunstashAll the messages will be processed sequentially in the order they were added and all are processed unless an exception is thrown. The actor is unresponsive to other new messages until unstashAllunstashAll is completed. That is another reason for keeping the number of stashed messages low. Actors that hog the message processing thread for too long can result in starvation of other actors.

That can be mitigated by using the StashBuffer.unstashStashBuffer.unstash with numberOfMessages parameter and then send a message to context.selfcontext.getSelf before continuing unstashing more. That means that other new messages may arrive in-between and those must be stashed to keep the original order of messages. It becomes more complicated, so better keep the number of stashed messages low.