Event Stream
You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Event Stream.
Dependency
To use Event Stream, you must have Pekko typed actors dependency in your project.
- sbt
val PekkoVersion = "1.1.2" libraryDependencies += "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2") implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}" }
How to use
The following example demonstrates how a subscription works. Given an actor:
- Scala
-
source
import org.apache.pekko.actor.typed.Behavior import org.apache.pekko.actor.typed.eventstream.EventStream.Subscribe import org.apache.pekko.actor.typed.scaladsl.Behaviors object DeadLetterListener { sealed trait Command final case class DeadLetterWrapper(deadLetter: DeadLetter) extends Command def apply(): Behavior[Command] = { Behaviors.setup[Command] { context => val adapter = context.messageAdapter[DeadLetter](DeadLetterWrapper.apply) context.system.eventStream ! Subscribe(adapter) Behaviors.receiveMessage { case DeadLetterWrapper(DeadLetter(message, sender, recipient)) => context.log.info("Dead letter received from sender ({}) to recipient ({}) with message: {}", sender.path.name, recipient.path.name, message.toString) Behaviors.same } } } } ActorSystem(Behaviors.setup[Void] { context => context.spawn(DeadLetterListener(), "DeadLetterListener") Behaviors.empty }, "DeadLetterListenerSystem")
- Java
-
source
import org.apache.pekko.actor.DeadLetter; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem;
The actor definition:
sourceinterface Command {}
static final class DeadLetterWrapper implements Command {
private final DeadLetter deadLetter;
public DeadLetterWrapper(DeadLetter deadLetter) {
this.deadLetter = deadLetter;
}
public DeadLetter getDeadLetter() {
return deadLetter;
}
}
static class DeadLetterListenerBehavior extends AbstractBehavior<Command> {
public static Behavior<Command> create() {
return Behaviors.setup(DeadLetterListenerBehavior::new);
}
public DeadLetterListenerBehavior(ActorContext<Command> context) {
super(context);
ActorRef<DeadLetter> messageAdapter =
context.messageAdapter(DeadLetter.class, DeadLetterWrapper::new);
context
.getSystem()
.eventStream()
.tell(new EventStream.Subscribe<>(DeadLetter.class, messageAdapter));
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(
DeadLetterWrapper.class,
msg -> {
final DeadLetter deadLetter = msg.getDeadLetter();
getContext()
.getLog()
.info(
"Dead letter received from sender ({}) to recipient ({}) with message: {}",
deadLetter.sender().path().name(),
deadLetter.recipient().path().name(),
deadLetter.message().toString());
return Behaviors.same();
})
.build();
}
}
It can be used as follows:
sourceActorSystem<Command> system =
ActorSystem.create(DeadLetterListenerBehavior.create(), "DeadLetterListener");
It is possible to subscribe to common superclass:
- Scala
-
source
object AllDeadLetterListener { sealed trait Command final case class AllDeadLettersWrapper(allDeadLetters: AllDeadLetters) extends Command def apply(): Behavior[Command] = { Behaviors.setup[Command] { context => val adapter = context.messageAdapter[AllDeadLetters](AllDeadLettersWrapper.apply) context.system.eventStream ! Subscribe(adapter) Behaviors.receiveMessage { case AllDeadLettersWrapper(allDeadLetters) => allDeadLetters match { case DeadLetter(message, sender, recipient) => context.log.info("DeadLetter received from sender ({}) to recipient ({}) with message: {}", sender.path.name, recipient.path.name, message.toString) case Dropped(message, reason, sender, recipient) => context.log.info("Dropped: sender ({}) to recipient ({}) with message: {}, reason: {}", sender.path.name, recipient.path.name, message.toString, reason) case SuppressedDeadLetter(message, sender, recipient) => // use trace otherwise logs will be flooded context.log.trace("SuppressedDeadLetter received from sender ({}) to recipient ({}) with message: {}", sender.path.name, recipient.path.name, message) case UnhandledMessage(message, sender, recipient) => context.log.info("UnhandledMessage received from sender ({}) to recipient ({}) with message: {}", sender.path.name, recipient.path.name, message.toString) } Behaviors.same } } } } ActorSystem(Behaviors.setup[Void] { context => context.spawn(AllDeadLetterListener(), "AllDeadLetterListener") Behaviors.empty }, "AllDeadLetterListenerSystem")
- Java
-
source
import org.apache.pekko.actor.DeadLetter; import org.apache.pekko.actor.AllDeadLetters; import org.apache.pekko.actor.Dropped; import org.apache.pekko.actor.SuppressedDeadLetter; import org.apache.pekko.actor.UnhandledMessage;
The actor definition:
sourceinterface Command {}
static final class AllDeadLettersWrapper implements Command {
private final AllDeadLetters allDeadLetters;
public AllDeadLettersWrapper(AllDeadLetters deadLetter) {
this.allDeadLetters = deadLetter;
}
public AllDeadLetters getAllDeadLetters() {
return allDeadLetters;
}
}
static class AllDeadLettersListenerBehavior extends AbstractBehavior<Command> {
public static Behavior<Command> create() {
return Behaviors.setup(AllDeadLettersListenerBehavior::new);
}
public AllDeadLettersListenerBehavior(ActorContext<Command> context) {
super(context);
ActorRef<AllDeadLetters> messageAdapter =
context.messageAdapter(
AllDeadLetters.class, EventStreamSuperClassDocTest.AllDeadLettersWrapper::new);
context
.getSystem()
.eventStream()
.tell(new EventStream.Subscribe<>(AllDeadLetters.class, messageAdapter));
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(
AllDeadLettersWrapper.class,
msg -> {
final AllDeadLetters allDeadLetters = msg.getAllDeadLetters();
final Class<? extends AllDeadLetters> klass = msg.allDeadLetters.getClass();
if (klass.isAssignableFrom(DeadLetter.class)) {
final DeadLetter deadLetter = (DeadLetter) allDeadLetters;
getContext()
.getLog()
.info(
"DeadLetter: sender ({}) to recipient ({}) with message: {}",
deadLetter.sender().path().name(),
deadLetter.recipient().path().name(),
deadLetter.message());
} else if (klass.isAssignableFrom(Dropped.class)) {
final Dropped dropped = (Dropped) allDeadLetters;
getContext()
.getLog()
.info(
"Dropped: sender ({}) to recipient ({}) with message: {}, reason: {}",
dropped.sender().path().name(),
dropped.recipient().path().name(),
dropped.message(),
dropped.reason());
} else if (klass.isAssignableFrom(SuppressedDeadLetter.class)) {
final SuppressedDeadLetter suppressedDeadLetter =
(SuppressedDeadLetter) allDeadLetters;
getContext()
.getLog()
.trace(
"SuppressedDeadLetter: sender ({}) to recipient ({}) with message: {}",
suppressedDeadLetter.sender().path().name(),
suppressedDeadLetter.recipient().path().name(),
suppressedDeadLetter.message());
} else if (klass.isAssignableFrom(UnhandledMessage.class)) {
final UnhandledMessage unhandledMessage = (UnhandledMessage) allDeadLetters;
getContext()
.getLog()
.info(
"UnhandledMessage: sender ({}) to recipient ({}) with message: {}",
unhandledMessage.sender().path().name(),
unhandledMessage.recipient().path().name(),
unhandledMessage.message());
}
return Behaviors.same();
})
.build();
}
}
It can be used as follows:
sourceActorSystem<Command> system =
ActorSystem.create(AllDeadLettersListenerBehavior.create(), "AllDeadLettersListener");
1.1.2