Actors interop

Dependency

To use Pekko Streams, add the module to your project:

sbt
val PekkoVersion = "1.1.2"
libraryDependencies += "org.apache.pekko" %% "pekko-stream" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2")

  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}"
}

Overview

There are various use cases where it might be reasonable to use actors and streams together:

  • when integrating existing API’s that might be streams- or actors-based.
  • when there is any mutable state that should be shared across multiple streams.
  • when there is any mutable state or logic that can be influenced ‘from outside’ while the stream is running.

For piping the elements of a stream as messages to an ordinary actor you can use ask in a mapAsync or use Sink.actorRefWithBackpressure.

Messages can be sent to a stream with Source.queue or via the ActorRef that is materialized by Source.actorRef.

Additionally you can use ActorSource.actorRef, ActorSource.actorRefWithBackpressure, ActorSink.actorRef and ActorSink.actorRefWithBackpressure shown below.

ask

A nice way to delegate some processing of elements in a stream to an actor is to use ask. The back-pressure of the stream is maintained by the FutureCompletionStage of the ask and the mailbox of the actor will not be filled with more messages than the given parallelism of the ask operator (similarly to how the mapAsync operator works).

Scala
sourceimplicit val askTimeout: Timeout = 5.seconds
val words: Source[String, NotUsed] =
  Source(List("hello", "hi"))

words
  .ask[String](parallelism = 5)(ref)
  // continue processing of the replies from the actor
  .map(_.toLowerCase)
  .runWith(Sink.ignore)
Java
sourceSource<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi"));
Timeout askTimeout = Timeout.apply(5, TimeUnit.SECONDS);

words
    .ask(5, ref, String.class, askTimeout)
    // continue processing of the replies from the actor
    .map(elem -> elem.toLowerCase())
    .runWith(Sink.ignore(), system);

Note that the messages received in the actor will be in the same order as the stream elements, i.e. the parallelism does not change the ordering of the messages. There is a performance advantage of using parallelism > 1 even though the actor will only process one message at a time because then there is already a message in the mailbox when the actor has completed previous message.

The actor must reply to the sender()getSender() for each message from the stream. That reply will complete the FutureCompletionStage of the ask and it will be the element that is emitted downstream.

In case the target actor is stopped, the operator will fail with an AskStageTargetActorTerminatedException

Scala
sourceclass Translator extends Actor {
  def receive = {
    case word: String =>
      // ... process message
      val reply = word.toUpperCase
      sender() ! reply // reply to the ask
  }
}
Java
sourcestatic class Translator extends AbstractActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            String.class,
            word -> {
              // ... process message
              String reply = word.toUpperCase();
              // reply to the ask
              getSender().tell(reply, getSelf());
            })
        .build();
  }
}

The stream can be completed with failure by sending org.apache.pekko.actor.Status.Failure as reply from the actor.

If the ask fails due to timeout the stream will be completed with TimeoutException failure. If that is not desired outcome you can use recover on the ask FutureCompletionStage, or use the other “restart” operators to restart it.

If you don’t care about the reply values and only use them as back-pressure signals you can use Sink.ignore after the ask operator and then actor is effectively a sink of the stream.

Note that while you may implement the same concept using mapAsync, that style would not be aware of the actor terminating.

If you are intending to ask multiple actors by using Actor routers, then you should use mapAsyncUnordered and perform the ask manually in there, as the ordering of the replies is not important, since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the operator.

Sink.actorRefWithBackpressure

The sink sends the elements of the stream to the given ActorRef that sends back back-pressure signal. First element is always onInitMessage, then stream is waiting for the given acknowledgement message from the given actor which means that it is ready to process elements. It also requires the given acknowledgement message after each stream element to make back-pressure work.

If the target actor terminates the stream will be cancelled. When the stream is completed successfully the given onCompleteMessage will be sent to the destination actor. When the stream is completed with failure a org.apache.pekko.actor.Status.Failure message will be sent to the destination actor.

Scala
sourceval words: Source[String, NotUsed] =
  Source(List("hello", "hi"))

// sent from actor to stream to "ack" processing of given element
val AckMessage = AckingReceiver.Ack

// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = AckingReceiver.StreamInitialized
val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)

val probe = TestProbe()
val receiver = system.actorOf(Props(new AckingReceiver(probe.ref)))
val sink = Sink.actorRefWithBackpressure(
  receiver,
  onInitMessage = InitMessage,
  ackMessage = AckMessage,
  onCompleteMessage = OnCompleteMessage,
  onFailureMessage = onErrorMessage)

words.map(_.toLowerCase).runWith(sink)

probe.expectMsg("Stream initialized!")
probe.expectMsg("hello")
probe.expectMsg("hi")
probe.expectMsg("Stream completed!")
Java
sourceSource<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi"));

final TestKit probe = new TestKit(system);

ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef()));

Sink<String, NotUsed> sink =
    Sink.<String>actorRefWithBackpressure(
        receiver,
        new StreamInitialized(),
        Ack.INSTANCE,
        new StreamCompleted(),
        ex -> new StreamFailure(ex));

words.map(el -> el.toLowerCase()).runWith(sink, system);

probe.expectMsg("Stream initialized");
probe.expectMsg("hello");
probe.expectMsg("hi");
probe.expectMsg("Stream completed");

The receiving actor would then need to be implemented similar to the following:

Scala
sourceobject AckingReceiver {
  case object Ack

  case object StreamInitialized
  case object StreamCompleted
  final case class StreamFailure(ex: Throwable)
}

class AckingReceiver(probe: ActorRef) extends Actor with ActorLogging {
  import AckingReceiver._

  def receive: Receive = {
    case StreamInitialized =>
      log.info("Stream initialized!")
      probe ! "Stream initialized!"
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case el: String =>
      log.info("Received element: {}", el)
      probe ! el
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case StreamCompleted =>
      log.info("Stream completed!")
      probe ! "Stream completed!"
    case StreamFailure(ex) =>
      log.error(ex, "Stream failed!")
  }
}
Java
sourceenum Ack {
  INSTANCE;
}

static class StreamInitialized {}

static class StreamCompleted {}

static class StreamFailure {
  private final Throwable cause;

  public StreamFailure(Throwable cause) {
    this.cause = cause;
  }

  public Throwable getCause() {
    return cause;
  }
}

static class AckingReceiver extends AbstractLoggingActor {

  private final ActorRef probe;

  public AckingReceiver(ActorRef probe) {
    this.probe = probe;
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            StreamInitialized.class,
            init -> {
              log().info("Stream initialized");
              probe.tell("Stream initialized", getSelf());
              sender().tell(Ack.INSTANCE, self());
            })
        .match(
            String.class,
            element -> {
              log().info("Received element: {}", element);
              probe.tell(element, getSelf());
              sender().tell(Ack.INSTANCE, self());
            })
        .match(
            StreamCompleted.class,
            completed -> {
              log().info("Stream completed");
              probe.tell("Stream completed", getSelf());
            })
        .match(
            StreamFailure.class,
            failed -> {
              log().error(failed.getCause(), "Stream failed!");
              probe.tell("Stream failed!", getSelf());
            })
        .build();
  }
}

Note that replying to the sender of the elements (the “stream”) is required as lack of those ack signals would be interpreted as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements. Handling the other signals while is not required, however is a good practice, to see the state of the stream’s lifecycle in the connected actor as well. Technically it is also possible to use multiple sinks targeting the same actor, however it is not common practice to do so, and one should rather investigate using a Merge operator for this purpose.

Note

Using Sink.actorRef or ordinary tell from a map or foreach operator means that there is no back-pressure signal from the destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow, unless you use a bounded mailbox with zero mailbox-push-timeout-time or use a rate limiting operator in front. It’s often better to use Sink.actorRefWithBackpressure or ask in mapAsync, though.

Source.queue

Source.queue is an improvement over Sink.actorRef, since it can provide backpressure. The offer method returns a FutureCompletionStage, which completes with the result of the enqueue operation.

Source.queue can be used for emitting elements to a stream from an actor (or from anything running outside the stream). The elements will be buffered until the stream can process them. You can offer elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.

Use overflow strategy org.apache.pekko.stream.OverflowStrategy.backpressure to avoid dropping of elements if the buffer is full, instead the returned FutureCompletionStage does not complete until there is space in the buffer and offer should not be called again until it completes.

Using Source.queue you can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded if downstream is terminated.

You could combine it with the throttle operator is used to slow down the stream to 5 element per 3 seconds and other patterns.

SourceQueue.offer returns Future[QueueOfferResult]CompletionStage<QueueOfferResult> which completes with QueueOfferResult.Enqueued if element was added to buffer or sent downstream. It completes with QueueOfferResult.Dropped if element was dropped. Can also complete with QueueOfferResult.Failure - when stream failed or QueueOfferResult.QueueClosed when downstream is completed.

Scala
sourceval bufferSize = 10
val elementsToProcess = 5

val queue = Source
  .queue[Int](bufferSize)
  .throttle(elementsToProcess, 3.second)
  .map(x => x * x)
  .toMat(Sink.foreach(x => println(s"completed $x")))(Keep.left)
  .run()

val source = Source(1 to 10)

implicit val ec = system.dispatcher
source
  .map(x => {
    queue.offer(x).map {
      case QueueOfferResult.Enqueued    => println(s"enqueued $x")
      case QueueOfferResult.Dropped     => println(s"dropped $x")
      case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
      case QueueOfferResult.QueueClosed => println("Source Queue closed")
    }
  })
  .runWith(Sink.ignore)
Java
sourceint bufferSize = 10;
int elementsToProcess = 5;

BoundedSourceQueue<Integer> sourceQueue =
    Source.<Integer>queue(bufferSize)
        .throttle(elementsToProcess, Duration.ofSeconds(3))
        .map(x -> x * x)
        .to(Sink.foreach(x -> System.out.println("got: " + x)))
        .run(system);

Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

source.map(x -> sourceQueue.offer(x)).runWith(Sink.ignore(), system);

When used from an actor you typically pipe the result of the FutureCompletionStage back to the actor to continue processing.

Source.actorRef

Messages sent to the actor that is materialized by Source.actorRef will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.

Depending on the defined OverflowStrategy it might drop elements if there is no space available in the buffer. The strategy OverflowStrategy.backpressure is not supported for this Source type, i.e. elements will be dropped if the buffer is filled by sending at a rate that is faster than the stream can consume. You should consider using Source.queue if you want a backpressured actor interface.

The stream can be completed successfully by sending any message to the actor that is handled by the completion matching function that was provided when the actor reference was created. If the returned completion strategy is org.apache.pekko.stream.CompletionStrategy.immediately the completion will be signaled immediately. If the completion strategy is org.apache.pekko.stream.CompletionStrategy.draining, already buffered elements will be processed before signaling completion. Any elements that are in the actor’s mailbox and subsequent elements sent to the actor will not be processed.

The stream can be completed with failure by sending any message to the actor that is handled by the failure matching function that was specified when the actor reference was created.

The actor will be stopped when the stream is completed, failed or cancelled from downstream. You can watch it to get notified when that happens.

Scala
sourceval bufferSize = 10

val cm: PartialFunction[Any, CompletionStrategy] = {
  case Done =>
    CompletionStrategy.immediately
}

val ref = Source
  .actorRef[Int](
    completionMatcher = cm,
    failureMatcher = PartialFunction.empty[Any, Throwable],
    bufferSize = bufferSize,
    overflowStrategy = OverflowStrategy.fail) // note: backpressure is not supported
  .map(x => x * x)
  .toMat(Sink.foreach((x: Int) => println(s"completed $x")))(Keep.left)
  .run()

ref ! 1
ref ! 2
ref ! 3
ref ! Done
Java
sourceint bufferSize = 10;

Source<Integer, ActorRef> source =
    Source.actorRef(
        elem -> {
          // complete stream immediately if we send it Done
          if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
          else return Optional.empty();
        },
        // never fail the stream because of a message
        elem -> Optional.empty(),
        bufferSize,
        OverflowStrategy.dropHead()); // note: backpressure is not supported
ActorRef actorRef =
    source
        .map(x -> x * x)
        .to(Sink.foreach(x -> System.out.println("got: " + x)))
        .run(system);

actorRef.tell(1, ActorRef.noSender());
actorRef.tell(2, ActorRef.noSender());
actorRef.tell(3, ActorRef.noSender());
actorRef.tell(
    new org.apache.pekko.actor.Status.Success(CompletionStrategy.draining()),
    ActorRef.noSender());

ActorSource.actorRef

Materialize an ActorRef<T>ActorRef[T]; sending messages to it will emit them on the stream only if they are of the same type as the stream.

ActorSource.actorRefWithBackpressure

Materialize an ActorRef<T>ActorRef[T]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.

ActorSink.actorRef

Sends the elements of the stream to the given ActorRef<T>ActorRef[T], without considering backpressure.

ActorSink.actorRefWithBackpressure

Sends the elements of the stream to the given ActorRef<T>ActorRef[T] with backpressure, to be able to signal demand when the actor is ready to receive more elements.

Topic.source

A source that will subscribe to a TopicTopic and stream messages published to the topic.

Topic.sink

A sink that will publish emitted messages to a TopicTopic.