Fault Tolerance

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Fault Tolerance.

When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor will by default be stopped.

Note

An important difference between Typed actors and Classic actors is that by default: the former are stopped if an exception is thrown and no supervision strategy is defined while in Classic they are restarted.

Note that there is an important distinction between failures and validation errors:

A validation error means that the data of a command sent to an actor is not valid, this should rather be modelled as a part of the actor protocol than make the actor throw exceptions.

A failure is instead something unexpected or outside the control of the actor itself, for example a database connection that broke. Opposite to validation errors, it is seldom useful to model failures as part of the protocol as a sending actor can very seldomly do anything useful about it.

For failures it is useful to apply the “let it crash” philosophy: instead of mixing fine grained recovery and correction of internal state that may have become partially invalid because of the failure with the business logic we move that responsibility somewhere else. For many cases the resolution can then be to “crash” the actor, and start a new one, with a fresh state that we know is valid.

Supervision

In Pekko, this “somewhere else” is called supervision. Supervision allows you to declaratively describe what should happen when certain types of exceptions are thrown inside an actor.

The default supervision strategy is to stop the actor if an exception is thrown. In many cases you will want to further customize this behavior. To use supervision the actual Actor behavior is wrapped using Behaviors.superviseBehaviors.supervise. Typically you would wrap the actor with supervision in the parent when spawning it as a child.

This example restarts the actor when it fails with an IllegalStateException:

Scala
sourceBehaviors.supervise(behavior).onFailure[IllegalStateException](SupervisorStrategy.restart)
Java
sourceBehaviors.supervise(behavior)
    .onFailure(IllegalStateException.class, SupervisorStrategy.restart());

Or to resume, ignore the failure and process the next message, instead:

Scala
sourceBehaviors.supervise(behavior).onFailure[IllegalStateException](SupervisorStrategy.resume)
Java
sourceBehaviors.supervise(behavior)
    .onFailure(IllegalStateException.class, SupervisorStrategy.resume());

More complicated restart strategies can be used e.g. to restart no more than 10 times in a 10 second period:

Scala
sourceBehaviors
  .supervise(behavior)
  .onFailure[IllegalStateException](
    SupervisorStrategy.restart.withLimit(maxNrOfRetries = 10, withinTimeRange = 10.seconds))
Java
sourceBehaviors.supervise(behavior)
    .onFailure(
        IllegalStateException.class,
        SupervisorStrategy.restart().withLimit(10, Duration.ofSeconds(10)));

To handle different exceptions with different strategies calls to supervisesupervise can be nested:

Scala
sourceBehaviors
  .supervise(Behaviors.supervise(behavior).onFailure[IllegalStateException](SupervisorStrategy.restart))
  .onFailure[IllegalArgumentException](SupervisorStrategy.stop)
Java
sourceBehaviors.supervise(
        Behaviors.supervise(behavior)
            .onFailure(IllegalStateException.class, SupervisorStrategy.restart()))
    .onFailure(IllegalArgumentException.class, SupervisorStrategy.stop());

For a full list of strategies see the public methods on SupervisorStrategySupervisorStrategy.

Note

When the behavior is restarted the original BehaviorBehavior that was given to Behaviors.superviseBehaviors.supervise is re-installed, which means that if it contains mutable state it must be a factory via Behaviors.setupBehaviors.setup. When using the object-oriented style with a class extending AbstractBehaviorAbstractBehavior it’s always recommended to create it via Behaviors.setupBehaviors.setup as described in Behavior factory method. For the function style there is typically no need for the factory if the state is captured in immutable parameters.

Wrapping behaviors

With the functional style it is very common to store state by changing behavior e.g.

Scala
sourceobject Counter {
  sealed trait Command
  case class Increment(nr: Int) extends Command
  case class GetCount(replyTo: ActorRef[Int]) extends Command

  def apply(): Behavior[Command] =
    Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart)

  private def counter(count: Int): Behavior[Command] =
    Behaviors.receiveMessage[Command] {
      case Increment(nr: Int) =>
        counter(count + nr)
      case GetCount(replyTo) =>
        replyTo ! count
        Behaviors.same
    }
}
Java
sourcepublic static class Counter {
  public interface Command {}

  public static final class Increase implements Command {}

  public static final class Get implements Command {
    public final ActorRef<Got> replyTo;

    public Get(ActorRef<Got> replyTo) {
      this.replyTo = replyTo;
    }
  }

  public static final class Got {
    public final int n;

    public Got(int n) {
      this.n = n;
    }
  }

  public static Behavior<Command> create() {
    return Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart());
  }

  private static Behavior<Command> counter(int currentValue) {
    return Behaviors.receive(Command.class)
        .onMessage(Increase.class, o -> onIncrease(currentValue))
        .onMessage(Get.class, command -> onGet(currentValue, command))
        .build();
  }

  private static Behavior<Command> onIncrease(int currentValue) {
    return counter(currentValue + 1);
  }

  private static Behavior<Command> onGet(int currentValue, Get command) {
    command.replyTo.tell(new Got(currentValue));
    return Behaviors.same();
  }
}

When doing this supervision only needs to be added to the top level:

Scala
sourcedef apply(): Behavior[Command] =
  Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart)
Java
sourcepublic static Behavior<Command> create() {
  return Behaviors.supervise(counter(1)).onFailure(SupervisorStrategy.restart());
}

Each returned behavior will be re-wrapped automatically with the supervisor.

Child actors are stopped when parent is restarting

Child actors are often started in a setupsetup block that is run again when the parent actor is restarted. The child actors are stopped to avoid resource leaks of creating new child actors each time the parent is restarted.

Scala
sourcedef child(size: Long): Behavior[String] =
  Behaviors.receiveMessage(msg => child(size + msg.length))

def parent: Behavior[String] = {
  Behaviors
    .supervise[String] {
      Behaviors.setup { ctx =>
        val child1 = ctx.spawn(child(0), "child1")
        val child2 = ctx.spawn(child(0), "child2")

        Behaviors.receiveMessage[String] { msg =>
          // message handling that might throw an exception
          val parts = msg.split(" ")
          child1 ! parts(0)
          child2 ! parts(1)
          Behaviors.same
        }
      }
    }
    .onFailure(SupervisorStrategy.restart)
}
Java
sourcestatic Behavior<String> child(long size) {
  return Behaviors.receiveMessage(msg -> child(size + msg.length()));
}

static Behavior<String> parent() {
  return Behaviors.<String>supervise(
          Behaviors.setup(
              ctx -> {
                final ActorRef<String> child1 = ctx.spawn(child(0), "child1");
                final ActorRef<String> child2 = ctx.spawn(child(0), "child2");

                return Behaviors.receiveMessage(
                    msg -> {
                      // message handling that might throw an exception
                      String[] parts = msg.split(" ");
                      child1.tell(parts[0]);
                      child2.tell(parts[1]);
                      return Behaviors.same();
                    });
              }))
      .onFailure(SupervisorStrategy.restart());
}

It is possible to override this so that child actors are not influenced when the parent actor is restarted. The restarted parent instance will then have the same children as before the failure.

If child actors are created from setupsetup like in the previous example and they should remain intact (not stopped) when parent is restarted, the superviseesupervisee should be placed inside the setupsetup and using SupervisorStrategy.restart.withStopChildren(false)SupervisorStrategy.restart().withStopChildren(false) like this:

Scala
sourcedef parent2: Behavior[String] = {
  Behaviors.setup { ctx =>
    val child1 = ctx.spawn(child(0), "child1")
    val child2 = ctx.spawn(child(0), "child2")

    // supervision strategy inside the setup to not recreate children on restart
    Behaviors
      .supervise {
        Behaviors.receiveMessage[String] { msg =>
          // message handling that might throw an exception
          val parts = msg.split(" ")
          child1 ! parts(0)
          child2 ! parts(1)
          Behaviors.same
        }
      }
      .onFailure(SupervisorStrategy.restart.withStopChildren(false))
  }
}
Java
sourcestatic Behavior<String> parent2() {
  return Behaviors.setup(
      ctx -> {
        final ActorRef<String> child1 = ctx.spawn(child(0), "child1");
        final ActorRef<String> child2 = ctx.spawn(child(0), "child2");

        // supervision strategy inside the setup to not recreate children on restart
        return Behaviors.<String>supervise(
                Behaviors.receiveMessage(
                    msg -> {
                      // message handling that might throw an exception
                      String[] parts = msg.split(" ");
                      child1.tell(parts[0]);
                      child2.tell(parts[1]);
                      return Behaviors.same();
                    }))
            .onFailure(SupervisorStrategy.restart().withStopChildren(false));
      });
}

That means that the setupsetup block will only be run when the parent actor is first started, and not when it is restarted.

The PreRestart signal

Before a supervised actor is restarted it is sent the PreRestartPreRestart signal giving it a chance to clean up resources it has created, much like the PostStopPostStop signal when the actor stops. The returned behavior from the PreRestartPreRestart signal is ignored.

Scala
sourcedef withPreRestart: Behavior[String] = {
  Behaviors
    .supervise[String] {
      Behaviors.setup { ctx =>
        val resource = claimResource()

        Behaviors
          .receiveMessage[String] { msg =>
            // message handling that might throw an exception

            val parts = msg.split(" ")
            resource.process(parts)
            Behaviors.same
          }
          .receiveSignal {
            case (_, signal) if signal == PreRestart || signal == PostStop =>
              resource.close()
              Behaviors.same
          }
      }
    }
    .onFailure[Exception](SupervisorStrategy.restart)
}
Java
sourceBehaviors.supervise(
        Behaviors.<String>setup(
            ctx -> {
              final Resource resource = claimResource();

              return Behaviors.receive(String.class)
                  .onMessage(
                      String.class,
                      msg -> {
                        // message handling that might throw an exception
                        String[] parts = msg.split(" ");
                        resource.process(parts);
                        return Behaviors.same();
                      })
                  .onSignal(
                      PreRestart.class,
                      signal -> {
                        resource.close();
                        return Behaviors.same();
                      })
                  .onSignal(
                      PostStop.class,
                      signal -> {
                        resource.close();
                        return Behaviors.same();
                      })
                  .build();
            }))
    .onFailure(Exception.class, SupervisorStrategy.restart());

Note that PostStopPostStop is not emitted for a restart, so typically you need to handle both PreRestartPreRestart and PostStopPostStop to cleanup resources.

Bubble failures up through the hierarchy

In some scenarios it may be useful to push the decision about what to do on a failure upwards in the Actor hierarchy and let the parent actor handle what should happen on failures (in classic Pekko Actors this is how it works by default).

For a parent to be notified when a child is terminated it has to watch the child. If the child was stopped because of a failure the ChildFailedChildFailed signal will be received which will contain the cause. ChildFailedChildFailed extends TerminatedTerminated so if your use case does not need to distinguish between stopping and failing you can handle both cases with the TerminatedTerminated signal.

If the parent in turn does not handle the TerminatedTerminated message it will itself fail with an DeathPactExceptionDeathPactException.

This means that a hierarchy of actors can have a child failure bubble up making each actor on the way stop but informing the top-most parent that there was a failure and how to deal with it, however, the original exception that caused the failure will only be available to the immediate parent out of the box (this is most often a good thing, not leaking implementation details).

There might be cases when you want the original exception to bubble up the hierarchy, this can be done by handling the TerminatedTerminated signal, and rethrowing the exception in each actor.

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.actor.typed.DeathPactException
import pekko.actor.typed.SupervisorStrategy
import pekko.actor.typed.scaladsl.Behaviors

object Protocol {
  sealed trait Command
  case class Fail(text: String) extends Command
  case class Hello(text: String, replyTo: ActorRef[String]) extends Command
}
import Protocol._

object Worker {
  def apply(): Behavior[Command] =
    Behaviors.receiveMessage {
      case Fail(text) =>
        throw new RuntimeException(text)
      case Hello(text, replyTo) =>
        replyTo ! text
        Behaviors.same
    }
}

object MiddleManagement {
  def apply(): Behavior[Command] =
    Behaviors.setup[Command] { context =>
      context.log.info("Middle management starting up")
      // default supervision of child, meaning that it will stop on failure
      val child = context.spawn(Worker(), "child")
      // we want to know when the child terminates, but since we do not handle
      // the Terminated signal, we will in turn fail on child termination
      context.watch(child)

      // here we don't handle Terminated at all which means that
      // when the child fails or stops gracefully this actor will
      // fail with a DeathPactException
      Behaviors.receiveMessage { message =>
        child ! message
        Behaviors.same
      }
    }
}

object Boss {
  def apply(): Behavior[Command] =
    Behaviors
      .supervise(Behaviors.setup[Command] { context =>
        context.log.info("Boss starting up")
        // default supervision of child, meaning that it will stop on failure
        val middleManagement = context.spawn(MiddleManagement(), "middle-management")
        context.watch(middleManagement)

        // here we don't handle Terminated at all which means that
        // when middle management fails with a DeathPactException
        // this actor will also fail
        Behaviors.receiveMessage[Command] { message =>
          middleManagement ! message
          Behaviors.same
        }
      })
      .onFailure[DeathPactException](SupervisorStrategy.restart)
}
Java
sourceimport org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.DeathPactException;
import org.apache.pekko.actor.typed.SupervisorStrategy;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;

public interface Protocol {
  public interface Command {}

  public static class Fail implements Command {
    public final String text;

    public Fail(String text) {
      this.text = text;
    }
  }

  public static class Hello implements Command {
    public final String text;
    public final ActorRef<String> replyTo;

    public Hello(String text, ActorRef<String> replyTo) {
      this.text = text;
      this.replyTo = replyTo;
    }
  }
}

public static class Worker extends AbstractBehavior<Protocol.Command> {

  public static Behavior<Protocol.Command> create() {
    return Behaviors.setup(Worker::new);
  }

  private Worker(ActorContext<Protocol.Command> context) {
    super(context);
  }

  @Override
  public Receive<Protocol.Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(Protocol.Fail.class, this::onFail)
        .onMessage(Protocol.Hello.class, this::onHello)
        .build();
  }

  private Behavior<Protocol.Command> onFail(Protocol.Fail message) {
    throw new RuntimeException(message.text);
  }

  private Behavior<Protocol.Command> onHello(Protocol.Hello message) {
    message.replyTo.tell(message.text);
    return this;
  }
}

public static class MiddleManagement extends AbstractBehavior<Protocol.Command> {

  public static Behavior<Protocol.Command> create() {
    return Behaviors.setup(MiddleManagement::new);
  }

  private final ActorRef<Protocol.Command> child;

  private MiddleManagement(ActorContext<Protocol.Command> context) {
    super(context);

    context.getLog().info("Middle management starting up");
    // default supervision of child, meaning that it will stop on failure
    child = context.spawn(Worker.create(), "child");

    // we want to know when the child terminates, but since we do not handle
    // the Terminated signal, we will in turn fail on child termination
    context.watch(child);
  }

  @Override
  public Receive<Protocol.Command> createReceive() {
    // here we don't handle Terminated at all which means that
    // when the child fails or stops gracefully this actor will
    // fail with a DeathPactException
    return newReceiveBuilder().onMessage(Protocol.Command.class, this::onCommand).build();
  }

  private Behavior<Protocol.Command> onCommand(Protocol.Command message) {
    // just pass messages on to the child
    child.tell(message);
    return this;
  }
}

public static class Boss extends AbstractBehavior<Protocol.Command> {

  public static Behavior<Protocol.Command> create() {
    return Behaviors.supervise(Behaviors.setup(Boss::new))
        .onFailure(DeathPactException.class, SupervisorStrategy.restart());
  }

  private final ActorRef<Protocol.Command> middleManagement;

  private Boss(ActorContext<Protocol.Command> context) {
    super(context);
    context.getLog().info("Boss starting up");
    // default supervision of child, meaning that it will stop on failure
    middleManagement = context.spawn(MiddleManagement.create(), "middle-management");
    context.watch(middleManagement);
  }

  @Override
  public Receive<Protocol.Command> createReceive() {
    // here we don't handle Terminated at all which means that
    // when middle management fails with a DeathPactException
    // this actor will also fail
    return newReceiveBuilder().onMessage(Protocol.Command.class, this::onCommand).build();
  }

  private Behavior<Protocol.Command> onCommand(Protocol.Command message) {
    // just pass messages on to the child
    middleManagement.tell(message);
    return this;
  }
}