ActorSink.actorRefWithBackpressure
Sends the elements of the stream to the given ActorRef<T>
ActorRef[T]
of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.
Dependency
This operator is included in:
- sbt
val PekkoVersion = "1.1.2+24-bcd44ee3-SNAPSHOT" libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2+24-bcd44ee3-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+24-bcd44ee3-SNAPSHOT") implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}" }
Signature
ActorSink.actorRefWithBackpressure
ActorSink.actorRefWithBackpressure
Description
Sends the elements of the stream to the given ActorRef<T>
ActorRef[T]
with backpressure, to be able to signal demand when the actor is ready to receive more elements. There is also a variant without a concrete acknowledge message accepting any message as such.
See also:
ActorSink.actorRef
Send elements to an actor of the new actors API, without considering backpressureSink.actorRef
Send elements to an actor of the classic actors API, without considering backpressureSink.actorRefWithBackpressue
The corresponding operator for the classic actors API
Examples
- Scala
-
source
import org.apache.pekko import pekko.actor.typed.ActorRef import pekko.stream.scaladsl.{ Sink, Source } import pekko.stream.typed.scaladsl.ActorSink trait Ack object Ack extends Ack trait Protocol case class Init(ackTo: ActorRef[Ack]) extends Protocol case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Throwable) extends Protocol val actor: ActorRef[Protocol] = targetActor() val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure( ref = actor, messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element), onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef), ackMessage = Ack, onCompleteMessage = Complete, onFailureMessage = exception => Fail(exception)) Source.single("msg1").runWith(sink)
- Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.typed.javadsl.ActorSink; enum Ack { INSTANCE; } interface Protocol {} class Init implements Protocol { private final ActorRef<Ack> ack; public Init(ActorRef<Ack> ack) { this.ack = ack; } } class Message implements Protocol { private final ActorRef<Ack> ackTo; private final String msg; public Message(ActorRef<Ack> ackTo, String msg) { this.ackTo = ackTo; this.msg = msg; } } class Complete implements Protocol {} class Fail implements Protocol { private final Throwable ex; public Fail(Throwable ex) { this.ex = ex; } } final ActorRef<Protocol> actorRef = // spawned actor final Complete completeMessage = new Complete(); final Sink<String, NotUsed> sink = ActorSink.actorRefWithBackpressure( actorRef, (responseActorRef, element) -> new Message(responseActorRef, element), (responseActorRef) -> new Init(responseActorRef), Ack.INSTANCE, completeMessage, (exception) -> new Fail(exception)); Source.single("msg1").runWith(sink, system);
Reactive Streams semantics
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived
1.1.2+24-bcd44ee3*