Event Stream
You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Event Stream.
Dependency¶
To use Event Stream, you must have Pekko typed actors dependency in your project.
val PekkoVersion = "1.1.3"
libraryDependencies += "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${scala.binary.version}</artifactId>
<version>1.1.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.3")
implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}"
}
How to use¶
The following example demonstrates how a subscription works. Given an actor:
sourceimport org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.eventstream.EventStream.Subscribe
import org.apache.pekko.actor.typed.scaladsl.Behaviors
object DeadLetterListener {
sealed trait Command
final case class DeadLetterWrapper(deadLetter: DeadLetter) extends Command
def apply(): Behavior[Command] = {
Behaviors.setup[Command] {
context =>
val adapter = context.messageAdapter[DeadLetter](DeadLetterWrapper.apply)
context.system.eventStream ! Subscribe(adapter)
Behaviors.receiveMessage {
case DeadLetterWrapper(DeadLetter(message, sender, recipient)) =>
context.log.info("Dead letter received from sender ({}) to recipient ({}) with message: {}",
sender.path.name, recipient.path.name, message.toString)
Behaviors.same
}
}
}
}
ActorSystem(Behaviors.setup[Void] { context =>
context.spawn(DeadLetterListener(), "DeadLetterListener")
Behaviors.empty
}, "DeadLetterListenerSystem")
sourceimport org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
It is possible to subscribe to common superclass:
sourceobject AllDeadLetterListener {
sealed trait Command
final case class AllDeadLettersWrapper(allDeadLetters: AllDeadLetters) extends Command
def apply(): Behavior[Command] = {
Behaviors.setup[Command] {
context =>
val adapter = context.messageAdapter[AllDeadLetters](AllDeadLettersWrapper.apply)
context.system.eventStream ! Subscribe(adapter)
Behaviors.receiveMessage {
case AllDeadLettersWrapper(allDeadLetters) =>
allDeadLetters match {
case DeadLetter(message, sender, recipient) =>
context.log.info("DeadLetter received from sender ({}) to recipient ({}) with message: {}",
sender.path.name, recipient.path.name, message.toString)
case Dropped(message, reason, sender, recipient) =>
context.log.info("Dropped: sender ({}) to recipient ({}) with message: {}, reason: {}",
sender.path.name, recipient.path.name, message.toString, reason)
case SuppressedDeadLetter(message, sender, recipient) =>
// use trace otherwise logs will be flooded
context.log.trace("SuppressedDeadLetter received from sender ({}) to recipient ({}) with message: {}",
sender.path.name, recipient.path.name, message)
case UnhandledMessage(message, sender, recipient) =>
context.log.info("UnhandledMessage received from sender ({}) to recipient ({}) with message: {}",
sender.path.name, recipient.path.name, message.toString)
}
Behaviors.same
}
}
}
}
ActorSystem(Behaviors.setup[Void] { context =>
context.spawn(AllDeadLetterListener(), "AllDeadLetterListener")
Behaviors.empty
}, "AllDeadLetterListenerSystem")
sourceimport org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.AllDeadLetters;
import org.apache.pekko.actor.Dropped;
import org.apache.pekko.actor.SuppressedDeadLetter;
import org.apache.pekko.actor.UnhandledMessage;
1.1.3