ActorSource.actorRefWithBackpressure
Materialize an ActorRef<T>
ActorRef[T]
of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
Dependency
This operator is included in:
- sbt
val PekkoVersion = "1.0.3" libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.3") implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}" }
Signature
ActorSource.actorRefWithBackpressure
ActorSource.actorRefWithBackpressure
Description
Materialize an ActorRef<T>
ActorRef[T]
, sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
See also:
- ActorSource.actorRef This operator, but without backpressure control
- Source.actorRef This operator, but without backpressure control for the classic actors API
- Source.actorRefWithBackpressure This operator for the classic actors API
- Source.queue Materialize a
SourceQueue
onto which elements can be pushed for emitting from the source
Example
With actorRefWithBackpressure
two actors get into play:
- An actor that is materialized when the stream runs. It feeds the stream.
- An actor provided by the user. It gets the ack signal when an element is emitted into the stream.
For the ack signal we create an Emitted
objectempty Emitted
class.
For “feeding” the stream we use the Event
traitinterface.
In this example we create the stream in an actor which itself reacts on the demand of the stream and sends more messages.
- Scala
-
source
import org.apache.pekko import pekko.actor.typed.ActorRef import pekko.stream.CompletionStrategy import pekko.stream.scaladsl.Sink import pekko.stream.typed.scaladsl.ActorSource object StreamFeeder { /** Signals that the latest element is emitted into the stream */ case object Emitted sealed trait Event case class Element(content: String) extends Event case object ReachedEnd extends Event case class FailureOccured(ex: Exception) extends Event def apply(): Behavior[Emitted.type] = Behaviors.setup { context => val streamActor = runStream(context.self)(context.system) streamActor ! Element("first") sender(streamActor, 0) } private def runStream(ackReceiver: ActorRef[Emitted.type])(implicit system: ActorSystem[_]): ActorRef[Event] = { val source = ActorSource.actorRefWithBackpressure[Event, Emitted.type]( // get demand signalled to this actor receiving Ack ackTo = ackReceiver, ackMessage = Emitted, // complete when we send ReachedEnd completionMatcher = { case ReachedEnd => CompletionStrategy.draining }, failureMatcher = { case FailureOccured(ex) => ex }) val streamActor: ActorRef[Event] = source .collect { case Element(msg) => msg } .to(Sink.foreach(println)) .run() streamActor } private def sender(streamSource: ActorRef[Event], counter: Int): Behavior[Emitted.type] = Behaviors.receiveMessage { case Emitted if counter < 5 => streamSource ! Element(counter.toString) sender(streamSource, counter + 1) case _ => streamSource ! ReachedEnd Behaviors.stopped } } ActorSystem(StreamFeeder(), "stream-feeder") // Will print: // first // 0 // 1 // 2 // 3 // 4
- Java
-
source
import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.actor.typed.javadsl.Receive; import org.apache.pekko.stream.CompletionStrategy; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.typed.javadsl.ActorSource; import java.util.Optional; class StreamFeeder extends AbstractBehavior<StreamFeeder.Emitted> { /** Signals that the latest element is emitted into the stream */ public enum Emitted { INSTANCE; } public interface Event {} public static class Element implements Event { public final String content; public Element(String content) { this.content = content; } @Override public String toString() { return "Element(" + content + ")"; } } public enum ReachedEnd implements Event { INSTANCE; } public static class FailureOccured implements Event { public final Exception ex; public FailureOccured(Exception ex) { this.ex = ex; } } public static Behavior<Emitted> create() { return Behaviors.setup(StreamFeeder::new); } private int counter = 0; private final ActorRef<Event> streamSource; private StreamFeeder(ActorContext<Emitted> context) { super(context); streamSource = runStream(context.getSelf(), context.getSystem()); streamSource.tell(new Element("first")); } @Override public Receive<Emitted> createReceive() { return newReceiveBuilder().onMessage(Emitted.class, this::onEmitted).build(); } private static ActorRef<Event> runStream(ActorRef<Emitted> ackReceiver, ActorSystem<?> system) { Source<Event, ActorRef<Event>> source = ActorSource.actorRefWithBackpressure( ackReceiver, Emitted.INSTANCE, // complete when we send ReachedEnd (msg) -> { if (msg == ReachedEnd.INSTANCE) return Optional.of(CompletionStrategy.draining()); else return Optional.empty(); }, (msg) -> { if (msg instanceof FailureOccured) return Optional.of(((FailureOccured) msg).ex); else return Optional.empty(); }); return source.to(Sink.foreach(System.out::println)).run(system); } private Behavior<Emitted> onEmitted(Emitted message) { if (counter < 5) { streamSource.tell(new Element(String.valueOf(counter))); counter++; return this; } else { streamSource.tell(ReachedEnd.INSTANCE); return Behaviors.stopped(); } } } ActorSystem<StreamFeeder.Emitted> system = ActorSystem.create(StreamFeeder.create(), "stream-feeder"); // will print: // Element(first) // Element(0) // Element(1) // Element(2) // Element(3) // Element(4)
Reactive Streams semantics
emits when a message is sent to the materialized ActorRef[T]
ActorRef<T>
it is emitted as soon as there is demand from downstream
completes when the passed completion matcher returns a CompletionStrategy