ActorSource.actorRef

Materialize an ActorRef<T>ActorRef[T] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.

Actor interop operators

Dependency

This operator is included in:

sbt
val PekkoVersion = "1.1.2+29-e21fa9eb-SNAPSHOT"
libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2+29-e21fa9eb-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+29-e21fa9eb-SNAPSHOT")

  implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}"
}

Signature

ActorSource.actorRefActorSource.actorRef

Description

Materialize an ActorRef<T>ActorRef[T] which only accepts messages that are of the same type as the stream.

See also:

Examples

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.stream.OverflowStrategy
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.typed.scaladsl.ActorSource

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol

val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
    case Complete =>
  },
  failureMatcher = {
    case Fail(ex) => ex
  }, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)

val ref = source
  .collect {
    case Message(msg) => msg
  }
  .to(Sink.foreach(println))
  .run()

ref ! Message("msg1")
// ref ! "msg2" Does not compile
Java
sourceimport org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.japi.JavaPartialFunction;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.typed.javadsl.ActorSource;

import java.util.Optional;

interface Protocol {}

class Message implements Protocol {
  private final String msg;

  public Message(String msg) {
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Exception ex;

  public Fail(Exception ex) {
    this.ex = ex;
  }
}

  final Source<Protocol, ActorRef<Protocol>> source =
      ActorSource.actorRef(
          (m) -> m instanceof Complete,
          (m) -> (m instanceof Fail) ? Optional.of(((Fail) m).ex) : Optional.empty(),
          8,
          OverflowStrategy.fail());

  final ActorRef<Protocol> ref =
      source
          .collect(
              new JavaPartialFunction<Protocol, String>() {
                public String apply(Protocol p, boolean isCheck) {
                  if (p instanceof Message) {
                    return ((Message) p).msg;
                  } else {
                    throw noMatch();
                  }
                }
              })
          .to(Sink.foreach(System.out::println))
          .run(system);

  ref.tell(new Message("msg1"));
  // ref.tell("msg2"); Does not compile