Event Stream

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Event Stream.

Dependency

To use Event Stream, you must have Pekko typed actors dependency in your project.

sbt
Maven
Gradle
val PekkoVersion = "1.1.3"
libraryDependencies += "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.3</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.3")

  implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}"
}

How to use

The following example demonstrates how a subscription works. Given an actor:

Scala
Java
sourceimport org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.eventstream.EventStream.Subscribe
import org.apache.pekko.actor.typed.scaladsl.Behaviors

object DeadLetterListener {
  sealed trait Command
  final case class DeadLetterWrapper(deadLetter: DeadLetter) extends Command

  def apply(): Behavior[Command] = {
    Behaviors.setup[Command] {
      context =>
        val adapter = context.messageAdapter[DeadLetter](DeadLetterWrapper.apply)
        context.system.eventStream ! Subscribe(adapter)

        Behaviors.receiveMessage {
          case DeadLetterWrapper(DeadLetter(message, sender, recipient)) =>
            context.log.info("Dead letter received from sender ({}) to recipient ({}) with message: {}",
              sender.path.name, recipient.path.name, message.toString)
            Behaviors.same
        }
    }
  }
}

  ActorSystem(Behaviors.setup[Void] { context =>
      context.spawn(DeadLetterListener(), "DeadLetterListener")
      Behaviors.empty
    }, "DeadLetterListenerSystem")
sourceimport org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;

It is possible to subscribe to common superclass:

Scala
Java
sourceobject AllDeadLetterListener {
  sealed trait Command
  final case class AllDeadLettersWrapper(allDeadLetters: AllDeadLetters) extends Command

  def apply(): Behavior[Command] = {
    Behaviors.setup[Command] {
      context =>
        val adapter = context.messageAdapter[AllDeadLetters](AllDeadLettersWrapper.apply)
        context.system.eventStream ! Subscribe(adapter)

        Behaviors.receiveMessage {
          case AllDeadLettersWrapper(allDeadLetters) =>
            allDeadLetters match {
              case DeadLetter(message, sender, recipient) =>
                context.log.info("DeadLetter received from sender ({}) to recipient ({}) with message: {}",
                  sender.path.name, recipient.path.name, message.toString)

              case Dropped(message, reason, sender, recipient) =>
                context.log.info("Dropped: sender ({}) to recipient ({}) with message: {}, reason: {}",
                  sender.path.name, recipient.path.name, message.toString, reason)

              case SuppressedDeadLetter(message, sender, recipient) =>
                // use trace otherwise logs will be flooded
                context.log.trace("SuppressedDeadLetter received from sender ({}) to recipient ({}) with message: {}",
                  sender.path.name, recipient.path.name, message)

              case UnhandledMessage(message, sender, recipient) =>
                context.log.info("UnhandledMessage received from sender ({}) to recipient ({}) with message: {}",
                  sender.path.name, recipient.path.name, message.toString)
            }
            Behaviors.same
        }
    }
  }
}

  ActorSystem(Behaviors.setup[Void] { context =>
      context.spawn(AllDeadLetterListener(), "AllDeadLetterListener")
      Behaviors.empty
    }, "AllDeadLetterListenerSystem")
sourceimport org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.AllDeadLetters;
import org.apache.pekko.actor.Dropped;
import org.apache.pekko.actor.SuppressedDeadLetter;
import org.apache.pekko.actor.UnhandledMessage;