Source.actorRef

Materialize an ActorRef of the classic actors API; sending messages to it will emit them on the stream.

Actor interop operators

Signature

Source.actorRefSource.actorRef

Description

Materialize an ActorRef, sending messages to it will emit them on the stream. The actor contains a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping elements or failing the stream; the strategy is chosen by the user.

The stream can be completed successfully by sending the actor reference a org.apache.pekko.actor.Status.Success. If the content is org.apache.pekko.stream.CompletionStrategy.immediately the completion will be signaled immediately. Otherwise, if the content is org.apache.pekko.stream.CompletionStrategy.draining (or anything else) already buffered elements will be sent out before signaling completion. Sending org.apache.pekko.actor.PoisonPill will signal completion immediately but this behavior is deprecated and scheduled to be removed. Using org.apache.pekko.actor.ActorSystem.stop to stop the actor and complete the stream is not supported.

See also:

Examples

Scala
sourceimport org.apache.pekko
import pekko.Done
import pekko.actor.ActorRef
import pekko.stream.OverflowStrategy
import pekko.stream.CompletionStrategy
import pekko.stream.scaladsl._

val source: Source[Any, ActorRef] = Source.actorRef(
  completionMatcher = {
    case Done =>
      // complete stream immediately if we send it Done
      CompletionStrategy.immediately
  },
  // never fail the stream because of a message
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead)
val actorRef: ActorRef = source.to(Sink.foreach(println)).run()

actorRef ! "hello"
actorRef ! "hello"

// The stream completes successfully with the following message
actorRef ! Done
Java
sourceimport org.apache.pekko.actor.ActorRef;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.CompletionStrategy;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.testkit.TestProbe;

int bufferSize = 100;
Source<Object, ActorRef> source =
    Source.actorRef(
        elem -> {
          // complete stream immediately if we send it Done
          if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
          else return Optional.empty();
        },
        // never fail the stream because of a message
        elem -> Optional.empty(),
        bufferSize,
        OverflowStrategy.dropHead());

ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);
actorRef.tell("hello", ActorRef.noSender());
actorRef.tell("hello", ActorRef.noSender());

// The stream completes successfully with the following message
actorRef.tell(Done.done(), ActorRef.noSender());

Reactive Streams semantics

emits when there is demand and there are messages in the buffer or a message is sent to the ActorRef

completes when the actor is stopped by sending it a particular message as described above