Classic Mailboxes

Note

Pekko Classic pertains to the original Actor APIs, which have been improved by more type safe and guided Actor APIs. Pekko Classic is still fully supported and existing applications can continue to use the classic APIs. It is also possible to use the new Actor APIs together with classic actors in the same ActorSystem, see coexistence. For new projects we recommend using the new Actor API.

For the full documentation of this feature and for new projects see mailboxes.

Dependency

To use Mailboxes, you must add the following dependency in your project:

sbt
val PekkoVersion = "1.1.2+29-e21fa9eb-SNAPSHOT"
libraryDependencies += "org.apache.pekko" %% "pekko-actor" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2+29-e21fa9eb-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+29-e21fa9eb-SNAPSHOT")

  implementation "org.apache.pekko:pekko-actor_${versions.ScalaBinary}"
}

Introduction

A Pekko Mailbox holds the messages that are destined for an ActorActor. Normally each Actor has its own mailbox, but with for example a BalancingPoolBalancingPool all routees will share a single mailbox instance.

For more details on advanced mailbox config and custom mailbox implementations, see Mailboxes.

Mailbox Selection

Default Mailbox

The default mailbox is used when the mailbox is not specified. This is an unbounded mailbox, backed by a java.util.concurrent.ConcurrentLinkedQueue.

SingleConsumerOnlyUnboundedMailboxSingleConsumerOnlyUnboundedMailbox is an even more efficient mailbox, and it can be used as the default mailbox, but it cannot be used with a BalancingDispatcher.

Configuration of SingleConsumerOnlyUnboundedMailbox as default mailbox:

pekko.actor.default-mailbox {
  mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

Requiring a Message Queue Type for an Actor

It is possible to require a certain type of message queue for a certain type of actor by having that actor extendimplement the parameterized traitinterface RequiresMessageQueueRequiresMessageQueue. Here is an example:

Scala
sourceimport org.apache.pekko
import pekko.dispatch.RequiresMessageQueue
import pekko.dispatch.BoundedMessageQueueSemantics

class MyBoundedActor extends MyActor with RequiresMessageQueue[BoundedMessageQueueSemantics]
Java
sourceimport org.apache.pekko.dispatch.BoundedMessageQueueSemantics;
import org.apache.pekko.dispatch.RequiresMessageQueue;

public class MyBoundedActor extends MyActor
    implements RequiresMessageQueue<BoundedMessageQueueSemantics> {}

The type parameter to the RequiresMessageQueue traitinterface needs to be mapped to a mailbox in configuration like this:

sourcebounded-mailbox {
  mailbox-type = "org.apache.pekko.dispatch.NonBlockingBoundedMailbox"
  mailbox-capacity = 1000 
}

pekko.actor.mailbox.requirements {
  "org.apache.pekko.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}

Now every time you create an actor of type MyBoundedActor it will try to get a bounded mailbox. If the actor has a different mailbox configured in deployment, either directly or via a dispatcher with a specified mailbox type, then that will override this mapping.

Note

The type of the queue in the mailbox created for an actor will be checked against the required type in the traitinterface and if the queue doesn’t implement the required type then actor creation will fail.

Requiring a Message Queue Type for a Dispatcher

A dispatcher may also have a requirement for the mailbox type used by the actors running on it. An example is the BalancingDispatcher which requires a message queue that is thread-safe for multiple concurrent consumers. Such a requirement is formulated within the dispatcher configuration section:

my-dispatcher {
  mailbox-requirement = org.example.MyInterface
}

The given requirement names a class or interface which will then be ensured to be a supertype of the message queue’s implementation. In case of a conflict—e.g. if the actor requires a mailbox type which does not satisfy this requirement—then actor creation will fail.

How the Mailbox Type is Selected

When an actor is created, the ActorRefProviderActorRefProvider first determines the dispatcher which will execute it. Then the mailbox is determined as follows:

  1. If the actor’s deployment configuration section contains a mailbox key, this refers to a configuration section describing the mailbox type.
  2. If the actor’s PropsProps contains a mailbox selection then that names a configuration section describing the mailbox type to be used. This needs to be an absolute config path, for example myapp.special-mailbox, and is not nested inside the pekko namespace.
  3. If the dispatcher’s configuration section contains a mailbox-type key the same section will be used to configure the mailbox type.
  4. If the actor requires a mailbox type as described above then the mapping for that requirement will be used to determine the mailbox type to be used; if that fails then the dispatcher’s requirement—if any—will be tried instead.
  5. If the dispatcher requires a mailbox type as described above then the mapping for that requirement will be used to determine the mailbox type to be used.
  6. The default mailbox pekko.actor.default-mailbox will be used.

Mailbox configuration examples

PriorityMailbox

How to create a PriorityMailbox:

Scala
sourceimport org.apache.pekko
import pekko.dispatch.PriorityGenerator
import pekko.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config

// We inherit, in this case, from UnboundedStablePriorityMailbox
// and seed it with the priority generator
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
    extends UnboundedStablePriorityMailbox(
      // Create a new PriorityGenerator, lower prio means more important
      PriorityGenerator {
        // highpriority messages should be treated first if possible
        case "highpriority" => 0

        // lowpriority messages should be treated last if possible
        case "lowpriority" => 2

        // PoisonPill when no other left
        case PoisonPill => 3

        // We default to 1, which is in between high and low
        case otherwise => 1
      })
Java
sourcestatic class MyPrioMailbox extends UnboundedStablePriorityMailbox {
  // needed for reflective instantiation
  public MyPrioMailbox(ActorSystem.Settings settings, Config config) {
    // Create a new PriorityGenerator, lower prio means more important
    super(
        new PriorityGenerator() {
          @Override
          public int gen(Object message) {
            if (message.equals("highpriority"))
              return 0; // 'highpriority messages should be treated first if possible
            else if (message.equals("lowpriority"))
              return 2; // 'lowpriority messages should be treated last if possible
            else if (message.equals(PoisonPill.getInstance()))
              return 3; // PoisonPill when no other left
            else return 1; // By default they go between high and low prio
          }
        });
  }
}

And then add it to the configuration:

sourceprio-dispatcher {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other dispatcher configuration goes here
}

And then an example on how you would use it:

Scala
sourceclass Logger extends Actor {
  private val log: LoggingAdapter = Logging(context.system, this)

  self ! "lowpriority"
  self ! "lowpriority"
  self ! "highpriority"
  self ! "pigdog"
  self ! "pigdog2"
  self ! "pigdog3"
  self ! "highpriority"

  self ! PoisonPill

  def receive: Receive = {
    case x => log.info(x.toString)
  }
}

  // We create a new Actor that just prints out what it processes
  val a = system.actorOf(Props(classOf[Logger])
    .withDispatcher("prio-dispatcher"))

  /*
   * Logs:
   * highpriority
   * highpriority
   * pigdog
   * pigdog2
   * pigdog3
   * lowpriority
   * lowpriority
   */
Java
source
class Demo extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); { for (Object msg : new Object[] { "lowpriority", "lowpriority", "highpriority", "pigdog", "pigdog2", "pigdog3", "highpriority", PoisonPill.getInstance() }) { getSelf().tell(msg, getSelf()); } } @Override public Receive createReceive() { return receiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); } } // We create a new Actor that just prints out what it processes ActorRef myActor = system.actorOf(Props.create(Demo.class, this).withDispatcher("prio-dispatcher")); /* Logs: 'highpriority 'highpriority 'pigdog 'pigdog2 'pigdog3 'lowpriority 'lowpriority */

It is also possible to configure a mailbox type directly like this (this is a top-level configuration entry):

Scala
sourceprio-mailbox {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other mailbox configuration goes here
}

pekko.actor.deployment {
  /priomailboxactor {
    mailbox = prio-mailbox
  }
}
Java
sourceprio-mailbox {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other mailbox configuration goes here
}

pekko.actor.deployment {
  /priomailboxactor {
    mailbox = prio-mailbox
  }
}

And then use it either from deployment like this:

Scala
sourceimport org.apache.pekko.actor.Props
val myActor = context.actorOf(Props[MyActor](), "priomailboxactor")
Java
sourceActorRef myActor = system.actorOf(Props.create(MyActor.class), "priomailboxactor");

Or code like this:

Scala
sourceimport org.apache.pekko.actor.Props
val myActor = context.actorOf(Props[MyActor]().withMailbox("prio-mailbox"))
Java
sourceActorRef myActor = system.actorOf(Props.create(MyActor.class).withMailbox("prio-mailbox"));

ControlAwareMailbox

A ControlAwareMailbox can be very useful if an actor needs to be able to receive control messages immediately no matter how many other messages are already in its mailbox.

It can be configured like this:

sourcecontrol-aware-dispatcher {
  mailbox-type = "org.apache.pekko.dispatch.UnboundedControlAwareMailbox"
  //Other dispatcher configuration goes here
}

Control messages need to extend the ControlMessageControlMessage traitinterface:

Scala
sourceimport org.apache.pekko.dispatch.ControlMessage

case object MyControlMessage extends ControlMessage
Java
sourcestatic class MyControlMessage implements ControlMessage {}

And then an example on how you would use it:

Scala
sourceclass MyLogger extends Actor {
  val log: LoggingAdapter = Logging(context.system, this)

  self ! "foo"
  self ! "bar"
  self ! MyControlMessage
  self ! PoisonPill

  def receive: Receive = {
    case x => log.info(x.toString)
  }
}

  // We create a new Actor that just prints out what it processes

  val a = system.actorOf(Props(classOf[MyLogger])
    .withDispatcher("control-aware-dispatcher"))

  /*
   * Logs:
   * MyControlMessage
   * foo
   * bar
   */
Java
source
class Demo extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); { for (Object msg : new Object[] {"foo", "bar", new MyControlMessage(), PoisonPill.getInstance()}) { getSelf().tell(msg, getSelf()); } } @Override public Receive createReceive() { return receiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); } } // We create a new Actor that just prints out what it processes ActorRef myActor = system.actorOf(Props.create(Demo.class, this).withDispatcher("control-aware-dispatcher")); /* Logs: 'MyControlMessage 'foo 'bar */

Special Semantics of system.actorOf

In order to make system.actorOfsystem.actorOf both synchronous and non-blocking while keeping the return type ActorRefActorRef (and the semantics that the returned ref is fully functional), special handling takes place for this case. Behind the scenes, a hollow kind of actor reference is constructed, which is sent to the system’s guardian actor who actually creates the actor and its context and puts those inside the reference. Until that has happened, messages sent to the ActorRefActorRef will be queued locally, and only upon swapping the real filling in will they be transferred into the real mailbox. Thus,

Scala
val props: Props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")
Java
final Props props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);
assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));

will probably fail; you will have to allow for some time to pass and retry the check à la TestKit.awaitCondTestKit.awaitCond.