ActorSource.actorRefWithBackpressure

Materialize an ActorRef<T>ActorRef[T] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.

Actor interop operators

Dependency

This operator is included in:

sbt
val PekkoVersion = "1.1.2+29-e21fa9eb-SNAPSHOT"
libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2+29-e21fa9eb-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+29-e21fa9eb-SNAPSHOT")

  implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}"
}

Signature

ActorSource.actorRefWithBackpressureActorSource.actorRefWithBackpressure

Description

Materialize an ActorRef<T>ActorRef[T], sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.

See also:

Example

With actorRefWithBackpressure two actors get into play:

  1. An actor that is materialized when the stream runs. It feeds the stream.
  2. An actor provided by the user. It gets the ack signal when an element is emitted into the stream.

For the ack signal we create an Emitted objectempty Emitted class.

For “feeding” the stream we use the Event traitinterface.

In this example we create the stream in an actor which itself reacts on the demand of the stream and sends more messages.

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.stream.CompletionStrategy
import pekko.stream.scaladsl.Sink
import pekko.stream.typed.scaladsl.ActorSource

object StreamFeeder {

  /** Signals that the latest element is emitted into the stream */
  case object Emitted

  sealed trait Event
  case class Element(content: String) extends Event
  case object ReachedEnd extends Event
  case class FailureOccured(ex: Exception) extends Event

  def apply(): Behavior[Emitted.type] =
    Behaviors.setup { context =>
      val streamActor = runStream(context.self)(context.system)
      streamActor ! Element("first")
      sender(streamActor, 0)
    }

  private def runStream(ackReceiver: ActorRef[Emitted.type])(implicit system: ActorSystem[_]): ActorRef[Event] = {
    val source =
      ActorSource.actorRefWithBackpressure[Event, Emitted.type](
        // get demand signalled to this actor receiving Ack
        ackTo = ackReceiver,
        ackMessage = Emitted,
        // complete when we send ReachedEnd
        completionMatcher = {
          case ReachedEnd => CompletionStrategy.draining
        },
        failureMatcher = {
          case FailureOccured(ex) => ex
        })

    val streamActor: ActorRef[Event] = source
      .collect {
        case Element(msg) => msg
      }
      .to(Sink.foreach(println))
      .run()

    streamActor
  }

  private def sender(streamSource: ActorRef[Event], counter: Int): Behavior[Emitted.type] =
    Behaviors.receiveMessage {
      case Emitted if counter < 5 =>
        streamSource ! Element(counter.toString)
        sender(streamSource, counter + 1)
      case _ =>
        streamSource ! ReachedEnd
        Behaviors.stopped
    }
}

ActorSystem(StreamFeeder(), "stream-feeder")

// Will print:
// first
// 0
// 1
// 2
// 3
// 4
Java
sourceimport org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.stream.CompletionStrategy;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.typed.javadsl.ActorSource;

import java.util.Optional;

class StreamFeeder extends AbstractBehavior<StreamFeeder.Emitted> {
  /** Signals that the latest element is emitted into the stream */
  public enum Emitted {
    INSTANCE;
  }

  public interface Event {}

  public static class Element implements Event {
    public final String content;

    public Element(String content) {
      this.content = content;
    }

    @Override
    public String toString() {
      return "Element(" + content + ")";
    }
  }

  public enum ReachedEnd implements Event {
    INSTANCE;
  }

  public static class FailureOccured implements Event {
    public final Exception ex;

    public FailureOccured(Exception ex) {
      this.ex = ex;
    }
  }

  public static Behavior<Emitted> create() {
    return Behaviors.setup(StreamFeeder::new);
  }

  private int counter = 0;
  private final ActorRef<Event> streamSource;

  private StreamFeeder(ActorContext<Emitted> context) {
    super(context);
    streamSource = runStream(context.getSelf(), context.getSystem());
    streamSource.tell(new Element("first"));
  }

  @Override
  public Receive<Emitted> createReceive() {
    return newReceiveBuilder().onMessage(Emitted.class, this::onEmitted).build();
  }

  private static ActorRef<Event> runStream(ActorRef<Emitted> ackReceiver, ActorSystem<?> system) {
    Source<Event, ActorRef<Event>> source =
        ActorSource.actorRefWithBackpressure(
            ackReceiver,
            Emitted.INSTANCE,
            // complete when we send ReachedEnd
            (msg) -> {
              if (msg == ReachedEnd.INSTANCE) return Optional.of(CompletionStrategy.draining());
              else return Optional.empty();
            },
            (msg) -> {
              if (msg instanceof FailureOccured) return Optional.of(((FailureOccured) msg).ex);
              else return Optional.empty();
            });

    return source.to(Sink.foreach(System.out::println)).run(system);
  }

  private Behavior<Emitted> onEmitted(Emitted message) {
    if (counter < 5) {
      streamSource.tell(new Element(String.valueOf(counter)));
      counter++;
      return this;
    } else {
      streamSource.tell(ReachedEnd.INSTANCE);
      return Behaviors.stopped();
    }
  }
}
    ActorSystem<StreamFeeder.Emitted> system =
        ActorSystem.create(StreamFeeder.create(), "stream-feeder");

    // will print:
    // Element(first)
    // Element(0)
    // Element(1)
    // Element(2)
    // Element(3)
    // Element(4)

Reactive Streams semantics

emits when a message is sent to the materialized ActorRef[T]ActorRef<T> it is emitted as soon as there is demand from downstream

completes when the passed completion matcher returns a CompletionStrategy