ActorFlow.askWithStatus
Use the “Ask Pattern” to send each stream element as an ask
to the target actor (of the new actors API), and expect a reply of Type StatusReply[T]
StatusReply<T>
where the T will be unwrapped and emitted downstream.
Dependency
This operator is included in:
- sbt
val PekkoVersion = "1.1.2+24-bcd44ee3-SNAPSHOT" libraryDependencies += "org.apache.pekko" %% "pekko-stream-typed" % PekkoVersion
- Maven
<properties> <pekko.version>1.1.2+24-bcd44ee3-SNAPSHOT</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-typed_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.2+24-bcd44ee3-SNAPSHOT", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-stream-typed_${versions.ScalaBinary}:${versions.PekkoVersion}" }
Signature
ActorFlow.askWithStatus
ActorFlow.askWithStatus
{ scala=“#askWithStatusI,Q,A(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]” java =“#askWithStatusI,Q,A:org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]” } ActorFlow.askWithStatus
ActorFlow.askWithStatus
{ scala=“#askWithStatusI,Q,A(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]” java =“#askWithStatusI,Q,A:org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]” }
Description
Use the Ask pattern to send a request-reply message to the target ref
actor when you expect the reply to be org.apache.pekko.pattern.StatusReply
. If any of the asks times out it will fail the stream with an AskTimeoutException
AskTimeoutException
.
The askWithStatus
operator requires
- the actor
ref
, - a
makeMessage
function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor’s reply message - a timeout.
Examples
The ActorFlow.askWithStatus
sends a message to the actor. The actor expects AskingWithStatus
messages which contain the actor ref for replies of type StatusReply[String]
StatusReply<String>
. When the actor for replies receives a reply, the ActorFlow.askWihStatus
stream stage emits the reply and the map
extracts the message String
.
- Scala
-
source
import org.apache.pekko import pekko.stream.scaladsl.{ Flow, Sink, Source } import pekko.stream.typed.scaladsl.ActorFlow import pekko.actor.typed.ActorRef import pekko.actor.typed.scaladsl.Behaviors import pekko.util.Timeout final case class Asking(s: String, replyTo: ActorRef[Reply]) final case class Reply(msg: String) final case class AskingWithStatus(s: String, replyTo: ActorRef[StatusReply[String]]) val ref = spawn(Behaviors.receiveMessage[Asking] { asking => asking.replyTo ! Reply(asking.s + "!!!") Behaviors.same }) implicit val timeout: Timeout = 1.second val askFlow: Flow[String, Reply, NotUsed] = ActorFlow.ask(ref)(Asking.apply) // explicit creation of the sent message val askFlowExplicit: Flow[String, Reply, NotUsed] = ActorFlow.ask(ref)(makeMessage = (el, replyTo: ActorRef[Reply]) => Asking(el, replyTo)) val in: Future[immutable.Seq[String]] = Source(1 to 50).map(_.toString).via(askFlow).map(_.msg).runWith(Sink.seq)
- Java
-
source
import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.pattern.StatusReply; import org.apache.pekko.stream.javadsl.Flow; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.stream.typed.javadsl.ActorFlow; class Asking { final String payload; final ActorRef<Reply> replyTo; public Asking(String payload, ActorRef<Reply> replyTo) { this.payload = payload; this.replyTo = replyTo; } } static class AskingWithStatus { final String payload; final ActorRef<StatusReply<String>> replyTo; public AskingWithStatus(String payload, ActorRef<StatusReply<String>> replyTo) { this.payload = payload; this.replyTo = replyTo; } } class Reply { public final String msg; public Reply(String msg) { this.msg = msg; } } final ActorRef<Asking> actorRef = // ??? final ActorRef<AskingWithStatus> actorWithStatusRef = // ??? Duration timeout = Duration.ofSeconds(1); // method reference notation Flow<String, Reply, NotUsed> askFlow = ActorFlow.ask(actorRef, timeout, Asking::new); // explicit creation of the sent message Flow<String, Reply, NotUsed> askFlowExplicit = ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo)); Flow<String, String, NotUsed> askFlowExplicitWithStatus = ActorFlow.askWithStatus( actorWithStatusRef, timeout, (msg, replyTo) -> new AskingWithStatus(msg, replyTo)); Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system);
Reactive Streams semantics
emits when the futures (in submission order) created by the ask pattern internally are completed
backpressures when the number of futures reaches the configured parallelism and the downstream backpressures
completes when upstream completes and all futures have been completed and all elements have been emitted
fails when the passed-in actor terminates, or when any of the askWithStatus
s exceed a timeout
cancels when downstream cancels