ActorSource.actorRef
Materialize an ActorRef[T]
of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.
Dependency¶
This operator is included in:
val PekkoVersion = "1.0.3"
libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${scala.binary.version}</artifactId>
<version>1.0.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream-typed_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.3")
implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}"
}
Signature¶
Description¶
Materialize an ActorRef[T]
which only accepts messages that are of the same type as the stream.
See also:
- ActorSource.actorRefWithBackpressure This operator, but with backpressure control
- Source.actorRef The corresponding operator for the classic actors API
- Source.actorRefWithBackpressure The operator for the classic actors API with backpressure control
- Source.queue Materialize a
SourceQueue
onto which elements can be pushed for emitting from the source
Examples¶
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.stream.OverflowStrategy
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.typed.scaladsl.ActorSource
trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol
val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
case Complete =>
},
failureMatcher = {
case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)
val ref = source
.collect {
case Message(msg) => msg
}
.to(Sink.foreach(println))
.run()
ref ! Message("msg1")
// ref ! "msg2" Does not compile
sourceimport org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.japi.JavaPartialFunction;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.typed.javadsl.ActorSource;
import java.util.Optional;
interface Protocol {}
class Message implements Protocol {
private final String msg;
public Message(String msg) {
this.msg = msg;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Exception ex;
public Fail(Exception ex) {
this.ex = ex;
}
}
final Source<Protocol, ActorRef<Protocol>> source =
ActorSource.actorRef(
(m) -> m instanceof Complete,
(m) -> (m instanceof Fail) ? Optional.of(((Fail) m).ex) : Optional.empty(),
8,
OverflowStrategy.fail());
final ActorRef<Protocol> ref =
source
.collect(
new JavaPartialFunction<Protocol, String>() {
public String apply(Protocol p, boolean isCheck) {
if (p instanceof Message) {
return ((Message) p).msg;
} else {
throw noMatch();
}
}
})
.to(Sink.foreach(System.out::println))
.run(system);
ref.tell(new Message("msg1"));
// ref.tell("msg2"); Does not compile
1.0.3