ActorSource.actorRefWithBackpressure
Materialize an ActorRef[T]
of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
Dependency¶
This operator is included in:
val PekkoVersion = "1.1.3"
libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${scala.binary.version}</artifactId>
<version>1.1.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream-typed_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.3")
implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}"
}
Signature¶
ActorSource.actorRefWithBackpressure
Description¶
Materialize an ActorRef[T]
, sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
See also:
- ActorSource.actorRef This operator, but without backpressure control
- Source.actorRef This operator, but without backpressure control for the classic actors API
- Source.actorRefWithBackpressure This operator for the classic actors API
- Source.queue Materialize a
SourceQueue
onto which elements can be pushed for emitting from the source
Example¶
With actorRefWithBackpressure
two actors get into play:
- An actor that is materialized when the stream runs. It feeds the stream.
- An actor provided by the user. It gets the ack signal when an element is emitted into the stream.
For the ack signal we create an Emitted
object .
For “feeding” the stream we use the Event
trait .
In this example we create the stream in an actor which itself reacts on the demand of the stream and sends more messages.
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.stream.CompletionStrategy
import pekko.stream.scaladsl.Sink
import pekko.stream.typed.scaladsl.ActorSource
object StreamFeeder {
/** Signals that the latest element is emitted into the stream */
case object Emitted
sealed trait Event
case class Element(content: String) extends Event
case object ReachedEnd extends Event
case class FailureOccured(ex: Exception) extends Event
def apply(): Behavior[Emitted.type] =
Behaviors.setup { context =>
val streamActor = runStream(context.self)(context.system)
streamActor ! Element("first")
sender(streamActor, 0)
}
private def runStream(ackReceiver: ActorRef[Emitted.type])(implicit system: ActorSystem[_]): ActorRef[Event] = {
val source =
ActorSource.actorRefWithBackpressure[Event, Emitted.type](
// get demand signalled to this actor receiving Ack
ackTo = ackReceiver,
ackMessage = Emitted,
// complete when we send ReachedEnd
completionMatcher = {
case ReachedEnd => CompletionStrategy.draining
},
failureMatcher = {
case FailureOccured(ex) => ex
})
val streamActor: ActorRef[Event] = source
.collect {
case Element(msg) => msg
}
.to(Sink.foreach(println))
.run()
streamActor
}
private def sender(streamSource: ActorRef[Event], counter: Int): Behavior[Emitted.type] =
Behaviors.receiveMessage {
case Emitted if counter < 5 =>
streamSource ! Element(counter.toString)
sender(streamSource, counter + 1)
case _ =>
streamSource ! ReachedEnd
Behaviors.stopped
}
}
ActorSystem(StreamFeeder(), "stream-feeder")
// Will print:
// first
// 0
// 1
// 2
// 3
// 4
sourceimport org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.stream.CompletionStrategy;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.typed.javadsl.ActorSource;
import java.util.Optional;
class StreamFeeder extends AbstractBehavior<StreamFeeder.Emitted> {
/** Signals that the latest element is emitted into the stream */
public enum Emitted {
INSTANCE;
}
public interface Event {}
public static class Element implements Event {
public final String content;
public Element(String content) {
this.content = content;
}
@Override
public String toString() {
return "Element(" + content + ")";
}
}
public enum ReachedEnd implements Event {
INSTANCE;
}
public static class FailureOccured implements Event {
public final Exception ex;
public FailureOccured(Exception ex) {
this.ex = ex;
}
}
public static Behavior<Emitted> create() {
return Behaviors.setup(StreamFeeder::new);
}
private int counter = 0;
private final ActorRef<Event> streamSource;
private StreamFeeder(ActorContext<Emitted> context) {
super(context);
streamSource = runStream(context.getSelf(), context.getSystem());
streamSource.tell(new Element("first"));
}
@Override
public Receive<Emitted> createReceive() {
return newReceiveBuilder().onMessage(Emitted.class, this::onEmitted).build();
}
private static ActorRef<Event> runStream(ActorRef<Emitted> ackReceiver, ActorSystem<?> system) {
Source<Event, ActorRef<Event>> source =
ActorSource.actorRefWithBackpressure(
ackReceiver,
Emitted.INSTANCE,
// complete when we send ReachedEnd
(msg) -> {
if (msg == ReachedEnd.INSTANCE) return Optional.of(CompletionStrategy.draining());
else return Optional.empty();
},
(msg) -> {
if (msg instanceof FailureOccured) return Optional.of(((FailureOccured) msg).ex);
else return Optional.empty();
});
return source.to(Sink.foreach(System.out::println)).run(system);
}
private Behavior<Emitted> onEmitted(Emitted message) {
if (counter < 5) {
streamSource.tell(new Element(String.valueOf(counter)));
counter++;
return this;
} else {
streamSource.tell(ReachedEnd.INSTANCE);
return Behaviors.stopped();
}
}
}
ActorSystem<StreamFeeder.Emitted> system =
ActorSystem.create(StreamFeeder.create(), "stream-feeder");
// will print:
// Element(first)
// Element(0)
// Element(1)
// Element(2)
// Element(3)
// Element(4)
Reactive Streams semantics¶
emits when a message is sent to the materialized ActorRef[T]
it is emitted as soon as there is demand from downstream
completes when the passed completion matcher returns a CompletionStrategy