Event Stream

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Event Stream.

Dependency

To use Event Stream, you must have Pekko typed actors dependency in your project.

sbt
val PekkoVersion = "1.1.2"
libraryDependencies += "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2")

  implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}"
}

How to use

The following example demonstrates how a subscription works. Given an actor:

Scala
sourceimport org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.eventstream.EventStream.Subscribe
import org.apache.pekko.actor.typed.scaladsl.Behaviors

object DeadLetterListener {
  sealed trait Command
  final case class DeadLetterWrapper(deadLetter: DeadLetter) extends Command

  def apply(): Behavior[Command] = {
    Behaviors.setup[Command] {
      context =>
        val adapter = context.messageAdapter[DeadLetter](DeadLetterWrapper.apply)
        context.system.eventStream ! Subscribe(adapter)

        Behaviors.receiveMessage {
          case DeadLetterWrapper(DeadLetter(message, sender, recipient)) =>
            context.log.info("Dead letter received from sender ({}) to recipient ({}) with message: {}",
              sender.path.name, recipient.path.name, message.toString)
            Behaviors.same
        }
    }
  }
}

  ActorSystem(Behaviors.setup[Void] { context =>
      context.spawn(DeadLetterListener(), "DeadLetterListener")
      Behaviors.empty
    }, "DeadLetterListenerSystem")
Java
sourceimport org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;

The actor definition:

sourceinterface Command {}

static final class DeadLetterWrapper implements Command {
  private final DeadLetter deadLetter;

  public DeadLetterWrapper(DeadLetter deadLetter) {
    this.deadLetter = deadLetter;
  }

  public DeadLetter getDeadLetter() {
    return deadLetter;
  }
}

static class DeadLetterListenerBehavior extends AbstractBehavior<Command> {

  public static Behavior<Command> create() {
    return Behaviors.setup(DeadLetterListenerBehavior::new);
  }

  public DeadLetterListenerBehavior(ActorContext<Command> context) {
    super(context);
    ActorRef<DeadLetter> messageAdapter =
        context.messageAdapter(DeadLetter.class, DeadLetterWrapper::new);
    context
        .getSystem()
        .eventStream()
        .tell(new EventStream.Subscribe<>(DeadLetter.class, messageAdapter));
  }

  @Override
  public Receive<Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(
            DeadLetterWrapper.class,
            msg -> {
              final DeadLetter deadLetter = msg.getDeadLetter();
              getContext()
                  .getLog()
                  .info(
                      "Dead letter received from sender ({}) to recipient ({}) with message: {}",
                      deadLetter.sender().path().name(),
                      deadLetter.recipient().path().name(),
                      deadLetter.message().toString());
              return Behaviors.same();
            })
        .build();
  }
}

It can be used as follows:

sourceActorSystem<Command> system =
    ActorSystem.create(DeadLetterListenerBehavior.create(), "DeadLetterListener");

It is possible to subscribe to common superclass:

Scala
sourceobject AllDeadLetterListener {
  sealed trait Command
  final case class AllDeadLettersWrapper(allDeadLetters: AllDeadLetters) extends Command

  def apply(): Behavior[Command] = {
    Behaviors.setup[Command] {
      context =>
        val adapter = context.messageAdapter[AllDeadLetters](AllDeadLettersWrapper.apply)
        context.system.eventStream ! Subscribe(adapter)

        Behaviors.receiveMessage {
          case AllDeadLettersWrapper(allDeadLetters) =>
            allDeadLetters match {
              case DeadLetter(message, sender, recipient) =>
                context.log.info("DeadLetter received from sender ({}) to recipient ({}) with message: {}",
                  sender.path.name, recipient.path.name, message.toString)

              case Dropped(message, reason, sender, recipient) =>
                context.log.info("Dropped: sender ({}) to recipient ({}) with message: {}, reason: {}",
                  sender.path.name, recipient.path.name, message.toString, reason)

              case SuppressedDeadLetter(message, sender, recipient) =>
                // use trace otherwise logs will be flooded
                context.log.trace("SuppressedDeadLetter received from sender ({}) to recipient ({}) with message: {}",
                  sender.path.name, recipient.path.name, message)

              case UnhandledMessage(message, sender, recipient) =>
                context.log.info("UnhandledMessage received from sender ({}) to recipient ({}) with message: {}",
                  sender.path.name, recipient.path.name, message.toString)
            }
            Behaviors.same
        }
    }
  }
}

  ActorSystem(Behaviors.setup[Void] { context =>
      context.spawn(AllDeadLetterListener(), "AllDeadLetterListener")
      Behaviors.empty
    }, "AllDeadLetterListenerSystem")
Java
sourceimport org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.AllDeadLetters;
import org.apache.pekko.actor.Dropped;
import org.apache.pekko.actor.SuppressedDeadLetter;
import org.apache.pekko.actor.UnhandledMessage;

The actor definition:

sourceinterface Command {}

static final class AllDeadLettersWrapper implements Command {
  private final AllDeadLetters allDeadLetters;

  public AllDeadLettersWrapper(AllDeadLetters deadLetter) {
    this.allDeadLetters = deadLetter;
  }

  public AllDeadLetters getAllDeadLetters() {
    return allDeadLetters;
  }
}

static class AllDeadLettersListenerBehavior extends AbstractBehavior<Command> {

  public static Behavior<Command> create() {
    return Behaviors.setup(AllDeadLettersListenerBehavior::new);
  }

  public AllDeadLettersListenerBehavior(ActorContext<Command> context) {
    super(context);
    ActorRef<AllDeadLetters> messageAdapter =
        context.messageAdapter(
            AllDeadLetters.class, EventStreamSuperClassDocTest.AllDeadLettersWrapper::new);
    context
        .getSystem()
        .eventStream()
        .tell(new EventStream.Subscribe<>(AllDeadLetters.class, messageAdapter));
  }

  @Override
  public Receive<Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(
            AllDeadLettersWrapper.class,
            msg -> {
              final AllDeadLetters allDeadLetters = msg.getAllDeadLetters();
              final Class<? extends AllDeadLetters> klass = msg.allDeadLetters.getClass();
              if (klass.isAssignableFrom(DeadLetter.class)) {
                final DeadLetter deadLetter = (DeadLetter) allDeadLetters;
                getContext()
                    .getLog()
                    .info(
                        "DeadLetter: sender ({}) to recipient ({}) with message: {}",
                        deadLetter.sender().path().name(),
                        deadLetter.recipient().path().name(),
                        deadLetter.message());
              } else if (klass.isAssignableFrom(Dropped.class)) {
                final Dropped dropped = (Dropped) allDeadLetters;
                getContext()
                    .getLog()
                    .info(
                        "Dropped: sender ({}) to recipient ({}) with message: {}, reason: {}",
                        dropped.sender().path().name(),
                        dropped.recipient().path().name(),
                        dropped.message(),
                        dropped.reason());
              } else if (klass.isAssignableFrom(SuppressedDeadLetter.class)) {
                final SuppressedDeadLetter suppressedDeadLetter =
                    (SuppressedDeadLetter) allDeadLetters;
                getContext()
                    .getLog()
                    .trace(
                        "SuppressedDeadLetter: sender ({}) to recipient ({}) with message: {}",
                        suppressedDeadLetter.sender().path().name(),
                        suppressedDeadLetter.recipient().path().name(),
                        suppressedDeadLetter.message());
              } else if (klass.isAssignableFrom(UnhandledMessage.class)) {
                final UnhandledMessage unhandledMessage = (UnhandledMessage) allDeadLetters;
                getContext()
                    .getLog()
                    .info(
                        "UnhandledMessage: sender ({}) to recipient ({}) with message: {}",
                        unhandledMessage.sender().path().name(),
                        unhandledMessage.recipient().path().name(),
                        unhandledMessage.message());
              }
              return Behaviors.same();
            })
        .build();
  }
}

It can be used as follows:

sourceActorSystem<Command> system =
    ActorSystem.create(AllDeadLettersListenerBehavior.create(), "AllDeadLettersListener");