ActorSink.actorRefWithBackpressure
Sends the elements of the stream to the given ActorRef<T>ActorRef[T] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.
Dependency
This operator is included in:
- sbt
- val PekkoVersion = "1.1.5" libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
- Maven
- <properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
- def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.5") implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}" }
Signature
ActorSink.actorRefWithBackpressureActorSink.actorRefWithBackpressure
Description
Sends the elements of the stream to the given ActorRef<T>ActorRef[T] with backpressure, to be able to signal demand when the actor is ready to receive more elements. There is also a variant without a concrete acknowledge message accepting any message as such.
See also:
- ActorSink.actorRefSend elements to an actor of the new actors API, without considering backpressure
- Sink.actorRefSend elements to an actor of the classic actors API, without considering backpressure
- Sink.actorRefWithBackpressueThe corresponding operator for the classic actors API
Examples
- Scala
- 
  source import org.apache.pekko import pekko.actor.typed.ActorRef import pekko.stream.scaladsl.{ Sink, Source } import pekko.stream.typed.scaladsl.ActorSink trait Ack object Ack extends Ack trait Protocol case class Init(ackTo: ActorRef[Ack]) extends Protocol case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Throwable) extends Protocol val actor: ActorRef[Protocol] = targetActor() val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure( ref = actor, messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element), onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef), ackMessage = Ack, onCompleteMessage = Complete, onFailureMessage = exception => Fail(exception)) Source.single("msg1").runWith(sink)
- Java
- 
  source import org.apache.pekko.NotUsed; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.typed.javadsl.ActorSink; enum Ack { INSTANCE; } interface Protocol {} class Init implements Protocol { private final ActorRef<Ack> ack; public Init(ActorRef<Ack> ack) { this.ack = ack; } } class Message implements Protocol { private final ActorRef<Ack> ackTo; private final String msg; public Message(ActorRef<Ack> ackTo, String msg) { this.ackTo = ackTo; this.msg = msg; } } class Complete implements Protocol {} class Fail implements Protocol { private final Throwable ex; public Fail(Throwable ex) { this.ex = ex; } } final ActorRef<Protocol> actorRef = // spawned actor final Complete completeMessage = new Complete(); final Sink<String, NotUsed> sink = ActorSink.actorRefWithBackpressure( actorRef, (responseActorRef, element) -> new Message(responseActorRef, element), (responseActorRef) -> new Init(responseActorRef), Ack.INSTANCE, completeMessage, (exception) -> new Fail(exception)); Source.single("msg1").runWith(sink, system);
Reactive Streams semantics
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived
1.1.5