Actor lifecycle

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Actors.

Dependency

To use Pekko Actor Typed, you must add the following dependency in your project:

sbt
val PekkoVersion = "1.0.1"
libraryDependencies += "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.0.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.1")

  implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}"
}

Introduction

An actor is a stateful resource that has to be explicitly started and stopped.

It is important to note that actors do not stop automatically when no longer referenced, every Actor that is created must also explicitly be destroyed. The only simplification is that stopping a parent Actor will also recursively stop all the child Actors that this parent has created. All actors are also stopped automatically when the ActorSystemActorSystem is shut down.

Note

An ActorSystem is a heavyweight structure that will allocate threads, so create one per logical application. Typically one ActorSystem per JVM process.

Creating Actors

An actor can create, or spawn, an arbitrary number of child actors, which in turn can spawn children of their own, thus forming an actor hierarchy. ActorSystemActorSystem hosts the hierarchy and there can be only one root actor, an actor at the top of the hierarchy of the ActorSystem. The lifecycle of a child actor is tied to the parent – a child can stop itself or be stopped at any time but it can never outlive its parent.

The ActorContext

The ActorContextActorContext can be accessed for many purposes such as:

  • Spawning child actors and supervision
  • Watching other actors to receive a Terminated(otherActor)Terminated(otherActor) event should the watched actor stop permanently
  • Logging
  • Creating message adapters
  • Request-response interactions (ask) with another actor
  • Access to the selfgetSelf() ActorRef

If a behavior needs to use the ActorContext, for example to spawn child actors, or use context.selfcontext.getSelf(), it can be obtained by wrapping construction with Behaviors.setupBehaviors.setup:

Scala
sourceobject HelloWorldMain {

  final case class SayHello(name: String)

  def apply(): Behavior[SayHello] =
    Behaviors.setup { context =>
      val greeter = context.spawn(HelloWorld(), "greeter")

      Behaviors.receiveMessage { message =>
        val replyTo = context.spawn(HelloWorldBot(max = 3), message.name)
        greeter ! HelloWorld.Greet(message.name, replyTo)
        Behaviors.same
      }
    }

}
Java
sourcepublic class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {
  public static Behavior<SayHello> create() {
    return Behaviors.setup(HelloWorldMain::new);
  }

  private final ActorRef<HelloWorld.Greet> greeter;

  private HelloWorldMain(ActorContext<SayHello> context) {
    super(context);
    greeter = context.spawn(HelloWorld.create(), "greeter");
  }
}

ActorContext Thread Safety

Many of the methods in ActorContextActorContext are not thread-safe and

The Guardian Actor

The top level actor, also called the user guardian actor, is created along with the ActorSystemActorSystem. Messages sent to the actor system are directed to the root actor. The root actor is defined by the behavior used to create the ActorSystem, named HelloWorldMain in the example below:

Scala
source
val system: ActorSystem[HelloWorldMain.SayHello] = ActorSystem(HelloWorldMain(), "hello") system ! HelloWorldMain.SayHello("World") system ! HelloWorldMain.SayHello("Pekko")
Java
sourcefinal ActorSystem<HelloWorldMain.SayHello> system =
    ActorSystem.create(HelloWorldMain.create(), "hello");

system.tell(new HelloWorldMain.SayHello("World"));
system.tell(new HelloWorldMain.SayHello("Pekko"));

For very simple applications the guardian may contain the actual application logic and handle messages. As soon as the application handles more than one concern the guardian should instead just bootstrap the application, spawn the various subsystems as children and monitor their lifecycles.

When the guardian actor stops this will stop the ActorSystem.

When ActorSystem.terminateActorSystem.terminate is invoked the Coordinated Shutdown process will stop actors and services in a specific order.

Spawning Children

Child actors are created and started with ActorContext’s spawnspawn. In the example below, when the root actor is started, it spawns a child actor described by the HelloWorld behavior. Additionally, when the root actor receives a SayHello message, it creates a child actor defined by the behavior HelloWorldBot:

Scala
sourceobject HelloWorldMain {

  final case class SayHello(name: String)

  def apply(): Behavior[SayHello] =
    Behaviors.setup { context =>
      val greeter = context.spawn(HelloWorld(), "greeter")

      Behaviors.receiveMessage { message =>
        val replyTo = context.spawn(HelloWorldBot(max = 3), message.name)
        greeter ! HelloWorld.Greet(message.name, replyTo)
        Behaviors.same
      }
    }

}
Java
sourcepublic class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {

  public static class SayHello {
    public final String name;

    public SayHello(String name) {
      this.name = name;
    }
  }

  public static Behavior<SayHello> create() {
    return Behaviors.setup(HelloWorldMain::new);
  }

  private final ActorRef<HelloWorld.Greet> greeter;

  private HelloWorldMain(ActorContext<SayHello> context) {
    super(context);
    greeter = context.spawn(HelloWorld.create(), "greeter");
  }

  @Override
  public Receive<SayHello> createReceive() {
    return newReceiveBuilder().onMessage(SayHello.class, this::onStart).build();
  }

  private Behavior<SayHello> onStart(SayHello command) {
    ActorRef<HelloWorld.Greeted> replyTo =
        getContext().spawn(HelloWorldBot.create(3), command.name);
    greeter.tell(new HelloWorld.Greet(command.name, replyTo));
    return this;
  }
}

To specify a dispatcher when spawning an actor use DispatcherSelectorDispatcherSelector. If not specified, the actor will use the default dispatcher, see Default dispatcher for details.

Scala
sourcedef apply(): Behavior[SayHello] =
  Behaviors.setup { context =>
    val dispatcherPath = "pekko.actor.default-blocking-io-dispatcher"

    val props = DispatcherSelector.fromConfig(dispatcherPath)
    val greeter = context.spawn(HelloWorld(), "greeter", props)

    Behaviors.receiveMessage { message =>
      val replyTo = context.spawn(HelloWorldBot(max = 3), message.name)

      greeter ! HelloWorld.Greet(message.name, replyTo)
      Behaviors.same
    }
  }
Java
sourcepublic class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {

  // Start message...

  public static Behavior<SayHello> create() {
    return Behaviors.setup(HelloWorldMain::new);
  }

  private final ActorRef<HelloWorld.Greet> greeter;

  private HelloWorldMain(ActorContext<SayHello> context) {
    super(context);

    final String dispatcherPath = "pekko.actor.default-blocking-io-dispatcher";
    Props greeterProps = DispatcherSelector.fromConfig(dispatcherPath);
    greeter = getContext().spawn(HelloWorld.create(), "greeter", greeterProps);
  }

  // createReceive ...
}

Refer to Actors for a walk-through of the above examples.

SpawnProtocol

The guardian actor should be responsible for initialization of tasks and create the initial actors of the application, but sometimes you might want to spawn new actors from the outside of the guardian actor. For example creating one actor per HTTP request.

That is not difficult to implement in your behavior, but since this is a common pattern there is a predefined message protocol and implementation of a behavior for this. It can be used as the guardian actor of the ActorSystemActorSystem, possibly combined with Behaviors.setupBehaviors.setup to start some initial tasks or actors. Child actors can then be started from the outside by telltelling or askasking SpawnProtocol.SpawnSpawnProtocol.Spawn to the actor reference of the system. Using ask is similar to how ActorSystem.actorOf can be used in classic actors with the difference that a FutureCompletionStage of the ActorRefActorRef is returned.

The guardian behavior can be defined as:

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.Behavior
import pekko.actor.typed.SpawnProtocol
import pekko.actor.typed.scaladsl.Behaviors
import pekko.actor.typed.scaladsl.LoggerOps

object HelloWorldMain {
  def apply(): Behavior[SpawnProtocol.Command] =
    Behaviors.setup { context =>
      // Start initial tasks
      // context.spawn(...)

      SpawnProtocol()
    }
}
Java
sourceimport org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.SpawnProtocol;
import org.apache.pekko.actor.typed.javadsl.Behaviors;

public abstract class HelloWorldMain {
  private HelloWorldMain() {}

  public static Behavior<SpawnProtocol.Command> create() {
    return Behaviors.setup(
        context -> {
          // Start initial tasks
          // context.spawn(...)

          return SpawnProtocol.create();
        });
  }
}

and the ActorSystemActorSystem can be created with that main behavior and asked to spawn other actors:

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Props
import pekko.util.Timeout


implicit val system: ActorSystem[SpawnProtocol.Command] =
  ActorSystem(HelloWorldMain(), "hello")

// needed in implicit scope for ask (?)
import pekko.actor.typed.scaladsl.AskPattern._
implicit val ec: ExecutionContext = system.executionContext
implicit val timeout: Timeout = Timeout(3.seconds)

val greeter: Future[ActorRef[HelloWorld.Greet]] =
  system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))

val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
  context.log.info2("Greeting for {} from {}", message.whom, message.from)
  Behaviors.stopped
}

val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
  system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))

for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
  greeterRef ! HelloWorld.Greet("Pekko", replyToRef)
}
Java
sourceimport org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.Props;
import org.apache.pekko.actor.typed.javadsl.AskPattern;

final ActorSystem<SpawnProtocol.Command> system =
    ActorSystem.create(HelloWorldMain.create(), "hello");
final Duration timeout = Duration.ofSeconds(3);

CompletionStage<ActorRef<HelloWorld.Greet>> greeter =
    AskPattern.ask(
        system,
        replyTo ->
            new SpawnProtocol.Spawn<>(HelloWorld.create(), "greeter", Props.empty(), replyTo),
        timeout,
        system.scheduler());

Behavior<HelloWorld.Greeted> greetedBehavior =
    Behaviors.receive(
        (context, message) -> {
          context.getLog().info("Greeting for {} from {}", message.whom, message.from);
          return Behaviors.stopped();
        });

CompletionStage<ActorRef<HelloWorld.Greeted>> greetedReplyTo =
    AskPattern.ask(
        system,
        replyTo -> new SpawnProtocol.Spawn<>(greetedBehavior, "", Props.empty(), replyTo),
        timeout,
        system.scheduler());

greeter.whenComplete(
    (greeterRef, exc) -> {
      if (exc == null) {
        greetedReplyTo.whenComplete(
            (greetedReplyToRef, exc2) -> {
              if (exc2 == null) {
                greeterRef.tell(new HelloWorld.Greet("Pekko", greetedReplyToRef));
              }
            });
      }
    });

The SpawnProtocolSpawnProtocol can also be used at other places in the actor hierarchy. It doesn’t have to be the root guardian actor.

A way to find running actors is described in Actor discovery.

Stopping Actors

An actor can stop itself by returning Behaviors.stoppedBehaviors.stopped as the next behavior.

A child actor can be forced to stop after it finishes processing its current message by using the stopstop method of the ActorContext from the parent actor. Only child actors can be stopped in that way.

All child actors will be stopped when their parent is stopped.

When an actor is stopped, it receives the PostStopPostStop signal that can be used for cleaning up resources.

Here is an illustrating example:

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.actor.typed.{ ActorSystem, PostStop }


object MasterControlProgram {
  sealed trait Command
  final case class SpawnJob(name: String) extends Command
  case object GracefulShutdown extends Command

  def apply(): Behavior[Command] = {
    Behaviors
      .receive[Command] { (context, message) =>
        message match {
          case SpawnJob(jobName) =>
            context.log.info("Spawning job {}!", jobName)
            context.spawn(Job(jobName), name = jobName)
            Behaviors.same
          case GracefulShutdown =>
            context.log.info("Initiating graceful shutdown...")
            // Here it can perform graceful stop (possibly asynchronous) and when completed
            // return `Behaviors.stopped` here or after receiving another message.
            Behaviors.stopped
        }
      }
      .receiveSignal {
        case (context, PostStop) =>
          context.log.info("Master Control Program stopped")
          Behaviors.same
      }
  }
}

object Job {
  sealed trait Command

  def apply(name: String): Behavior[Command] = {
    Behaviors.receiveSignal[Command] {
      case (context, PostStop) =>
        context.log.info("Worker {} stopped", name)
        Behaviors.same
    }
  }
}
Java
source
import java.util.concurrent.TimeUnit; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.PostStop; import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.actor.typed.javadsl.Receive; public class MasterControlProgram extends AbstractBehavior<MasterControlProgram.Command> { interface Command {} public static final class SpawnJob implements Command { public final String name; public SpawnJob(String name) { this.name = name; } } public enum GracefulShutdown implements Command { INSTANCE } public static Behavior<Command> create() { return Behaviors.setup(MasterControlProgram::new); } public MasterControlProgram(ActorContext<Command> context) { super(context); } @Override public Receive<Command> createReceive() { return newReceiveBuilder() .onMessage(SpawnJob.class, this::onSpawnJob) .onMessage(GracefulShutdown.class, message -> onGracefulShutdown()) .onSignal(PostStop.class, signal -> onPostStop()) .build(); } private Behavior<Command> onSpawnJob(SpawnJob message) { getContext().getSystem().log().info("Spawning job {}!", message.name); getContext().spawn(Job.create(message.name), message.name); return this; } private Behavior<Command> onGracefulShutdown() { getContext().getSystem().log().info("Initiating graceful shutdown..."); // Here it can perform graceful stop (possibly asynchronous) and when completed // return `Behaviors.stopped()` here or after receiving another message. return Behaviors.stopped(); } private Behavior<Command> onPostStop() { getContext().getSystem().log().info("Master Control Program stopped"); return this; } } public class Job extends AbstractBehavior<Job.Command> { interface Command {} public static Behavior<Command> create(String name) { return Behaviors.setup(context -> new Job(context, name)); } private final String name; public Job(ActorContext<Command> context, String name) { super(context); this.name = name; } @Override public Receive<Job.Command> createReceive() { return newReceiveBuilder().onSignal(PostStop.class, postStop -> onPostStop()).build(); } private Behavior<Command> onPostStop() { getContext().getSystem().log().info("Worker {} stopped", name); return this; } }

When cleaning up resources from PostStop you should also consider doing the same for the PreRestartPreRestart signal, which is emitted when the actor is restarted. Note that PostStop is not emitted for a restart.

Watching Actors

In order to be notified when another actor terminates (i.e. stops permanently, not temporary failure and restart), an actor can watchwatch another actor. It will receive the TerminatedTerminated signal upon termination (see Stopping Actors) of the watched actor.

Scala
source
object MasterControlProgram { sealed trait Command final case class SpawnJob(name: String) extends Command def apply(): Behavior[Command] = { Behaviors .receive[Command] { (context, message) => message match { case SpawnJob(jobName) => context.log.info("Spawning job {}!", jobName) val job = context.spawn(Job(jobName), name = jobName) context.watch(job) Behaviors.same } } .receiveSignal { case (context, Terminated(ref)) => context.log.info("Job stopped: {}", ref.path.name) Behaviors.same } } }
Java
sourcepublic class MasterControlProgram extends AbstractBehavior<MasterControlProgram.Command> {

  interface Command {}

  public static final class SpawnJob implements Command {
    public final String name;

    public SpawnJob(String name) {
      this.name = name;
    }
  }

  public static Behavior<Command> create() {
    return Behaviors.setup(MasterControlProgram::new);
  }

  public MasterControlProgram(ActorContext<Command> context) {
    super(context);
  }

  @Override
  public Receive<Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(SpawnJob.class, this::onSpawnJob)
        .onSignal(Terminated.class, this::onTerminated)
        .build();
  }

  private Behavior<Command> onSpawnJob(SpawnJob message) {
    getContext().getSystem().log().info("Spawning job {}!", message.name);
    ActorRef<Job.Command> job = getContext().spawn(Job.create(message.name), message.name);
    getContext().watch(job);
    return this;
  }

  private Behavior<Command> onTerminated(Terminated terminated) {
    getContext().getSystem().log().info("Job stopped: {}", terminated.getRef().path().name());
    return this;
  }
}

An alternative to watchwatch is watchWithwatchWith, which allows specifying a custom message instead of the Terminated. This is often preferred over using watch and the Terminated signal because additional information can be included in the message that can be used later when receiving it.

Similar example as above, but using watchWith and replies to the original requestor when the job has finished.

Scala
source
object MasterControlProgram { sealed trait Command final case class SpawnJob(name: String, replyToWhenDone: ActorRef[JobDone]) extends Command final case class JobDone(name: String) private final case class JobTerminated(name: String, replyToWhenDone: ActorRef[JobDone]) extends Command def apply(): Behavior[Command] = { Behaviors.receive { (context, message) => message match { case SpawnJob(jobName, replyToWhenDone) => context.log.info("Spawning job {}!", jobName) val job = context.spawn(Job(jobName), name = jobName) context.watchWith(job, JobTerminated(jobName, replyToWhenDone)) Behaviors.same case JobTerminated(jobName, replyToWhenDone) => context.log.info("Job stopped: {}", jobName) replyToWhenDone ! JobDone(jobName) Behaviors.same } } } }
Java
sourcepublic class MasterControlProgram extends AbstractBehavior<MasterControlProgram.Command> {

  interface Command {}

  public static final class SpawnJob implements Command {
    public final String name;
    public final ActorRef<JobDone> replyToWhenDone;

    public SpawnJob(String name, ActorRef<JobDone> replyToWhenDone) {
      this.name = name;
      this.replyToWhenDone = replyToWhenDone;
    }
  }

  public static final class JobDone {
    public final String name;

    public JobDone(String name) {
      this.name = name;
    }
  }

  private static final class JobTerminated implements Command {
    final String name;
    final ActorRef<JobDone> replyToWhenDone;

    JobTerminated(String name, ActorRef<JobDone> replyToWhenDone) {
      this.name = name;
      this.replyToWhenDone = replyToWhenDone;
    }
  }

  public static Behavior<Command> create() {
    return Behaviors.setup(MasterControlProgram::new);
  }

  public MasterControlProgram(ActorContext<Command> context) {
    super(context);
  }

  @Override
  public Receive<Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(SpawnJob.class, this::onSpawnJob)
        .onMessage(JobTerminated.class, this::onJobTerminated)
        .build();
  }

  private Behavior<Command> onSpawnJob(SpawnJob message) {
    getContext().getSystem().log().info("Spawning job {}!", message.name);
    ActorRef<Job.Command> job = getContext().spawn(Job.create(message.name), message.name);
    getContext().watchWith(job, new JobTerminated(message.name, message.replyToWhenDone));
    return this;
  }

  private Behavior<Command> onJobTerminated(JobTerminated terminated) {
    getContext().getSystem().log().info("Job stopped: {}", terminated.name);
    terminated.replyToWhenDone.tell(new JobDone(terminated.name));
    return this;
  }
}

Note how the replyToWhenDone is included in the watchWith message and then used later when receiving the JobTerminated message.

The watched actor can be any ActorRefActorRef, it doesn’t have to be a child actor as in the above example.

It should be noted that the terminated message is generated independent of the order in which registration and termination occur. In particular, the watching actor will receive a terminated message even if the watched actor has already been terminated at the time of registration.

Registering multiple times does not necessarily lead to multiple messages being generated, but there is no guarantee that only exactly one such message is received: if termination of the watched actor has generated and queued the message, and another registration is done before this message has been processed, then a second message will be queued, because registering for monitoring of an already terminated actor leads to the immediate generation of the terminated message.

It is also possible to deregister from watching another actor’s liveliness using context.unwatch(target)context.unwatch(target). This works even if the terminated message has already been enqueued in the mailbox; after calling unwatch no terminated message for that actor will be processed anymore.

The terminated message is also sent when the watched actor is on a node that has been removed from the Cluster.