ActorSink.actorRefWithBackpressure

Sends the elements of the stream to the given ActorRef<T>ActorRef[T] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.

Actor interop operators

Dependency

This operator is included in:

sbt
val PekkoVersion = "1.1.2+24-bcd44ee3-SNAPSHOT"
libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2+24-bcd44ee3-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+24-bcd44ee3-SNAPSHOT")

  implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}"
}

Signature

ActorSink.actorRefWithBackpressureActorSink.actorRefWithBackpressure

Description

Sends the elements of the stream to the given ActorRef<T>ActorRef[T] with backpressure, to be able to signal demand when the actor is ready to receive more elements. There is also a variant without a concrete acknowledge message accepting any message as such.

See also:

Examples

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.typed.scaladsl.ActorSink

trait Ack
object Ack extends Ack

trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = targetActor()

val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
  ref = actor,
  messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element),
  onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef),
  ackMessage = Ack,
  onCompleteMessage = Complete,
  onFailureMessage = exception => Fail(exception))

Source.single("msg1").runWith(sink)
Java
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.typed.javadsl.ActorSink;

enum Ack {
  INSTANCE;
}

interface Protocol {}

class Init implements Protocol {
  private final ActorRef<Ack> ack;

  public Init(ActorRef<Ack> ack) {
    this.ack = ack;
  }
}

class Message implements Protocol {
  private final ActorRef<Ack> ackTo;
  private final String msg;

  public Message(ActorRef<Ack> ackTo, String msg) {
    this.ackTo = ackTo;
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Throwable ex;

  public Fail(Throwable ex) {
    this.ex = ex;
  }
}

  final ActorRef<Protocol> actorRef = // spawned actor

  final Complete completeMessage = new Complete();

  final Sink<String, NotUsed> sink =
      ActorSink.actorRefWithBackpressure(
          actorRef,
          (responseActorRef, element) -> new Message(responseActorRef, element),
          (responseActorRef) -> new Init(responseActorRef),
          Ack.INSTANCE,
          completeMessage,
          (exception) -> new Fail(exception));

  Source.single("msg1").runWith(sink, system);

Reactive Streams semantics

cancels when the actor terminates

backpressures when the actor acknowledgement has not arrived