ActorSource.actorRef
Materialize an ActorRef<T>
ActorRef[T]
of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.
Dependency
This operator is included in:
- sbt
val PekkoVersion = "1.1.2+29-e21fa9eb-SNAPSHOT" libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2+29-e21fa9eb-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+29-e21fa9eb-SNAPSHOT") implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}" }
Signature
ActorSource.actorRef
ActorSource.actorRef
Description
Materialize an ActorRef<T>
ActorRef[T]
which only accepts messages that are of the same type as the stream.
See also:
- ActorSource.actorRefWithBackpressure This operator, but with backpressure control
- Source.actorRef The corresponding operator for the classic actors API
- Source.actorRefWithBackpressure The operator for the classic actors API with backpressure control
- Source.queue Materialize a
SourceQueue
onto which elements can be pushed for emitting from the source
Examples
- Scala
-
source
import org.apache.pekko import pekko.actor.typed.ActorRef import pekko.stream.OverflowStrategy import pekko.stream.scaladsl.{ Sink, Source } import pekko.stream.typed.scaladsl.ActorSource trait Protocol case class Message(msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Exception) extends Protocol val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = { case Complete => }, failureMatcher = { case Fail(ex) => ex }, bufferSize = 8, overflowStrategy = OverflowStrategy.fail) val ref = source .collect { case Message(msg) => msg } .to(Sink.foreach(println)) .run() ref ! Message("msg1") // ref ! "msg2" Does not compile
- Java
-
source
import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.japi.JavaPartialFunction; import org.apache.pekko.stream.OverflowStrategy; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.typed.javadsl.ActorSource; import java.util.Optional; interface Protocol {} class Message implements Protocol { private final String msg; public Message(String msg) { this.msg = msg; } } class Complete implements Protocol {} class Fail implements Protocol { private final Exception ex; public Fail(Exception ex) { this.ex = ex; } } final Source<Protocol, ActorRef<Protocol>> source = ActorSource.actorRef( (m) -> m instanceof Complete, (m) -> (m instanceof Fail) ? Optional.of(((Fail) m).ex) : Optional.empty(), 8, OverflowStrategy.fail()); final ActorRef<Protocol> ref = source .collect( new JavaPartialFunction<Protocol, String>() { public String apply(Protocol p, boolean isCheck) { if (p instanceof Message) { return ((Message) p).msg; } else { throw noMatch(); } } }) .to(Sink.foreach(System.out::println)) .run(system); ref.tell(new Message("msg1")); // ref.tell("msg2"); Does not compile
1.1.2+29-e21fa9eb*