Source.actorRefWithBackpressure
Materialize an ActorRef
of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
Signature
Source.actorRefWithBackpressure
Source.actorRefWithBackpressure
Description
Materialize an ActorRef
, sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
See also:
- Source.actorRef This operator without backpressure control
- ActorSource.actorRef The operator for the new actors API without backpressure control
- ActorSource.actorRefWithBackpressure The corresponding operator for the new actors API
- Source.queue Materialize a
SourceQueue
onto which elements can be pushed for emitting from the source
Examples
- Scala
-
source
import org.apache.pekko import pekko.actor.Status.Success import pekko.actor.ActorRef import pekko.stream.CompletionStrategy import pekko.stream.scaladsl._ val probe = TestProbe() val source: Source[String, ActorRef] = Source.actorRefWithBackpressure[String]( ackMessage = "ack", // complete when we send pekko.actor.status.Success completionMatcher = { case _: Success => CompletionStrategy.immediately }, // do not fail on any message failureMatcher = PartialFunction.empty) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() probe.send(actorRef, "hello") probe.expectMsg("ack") probe.send(actorRef, "hello") probe.expectMsg("ack") // The stream completes successfully with the following message actorRef ! Success(()) - Java
-
source
import org.apache.pekko.actor.ActorRef; import org.apache.pekko.stream.OverflowStrategy; import org.apache.pekko.stream.CompletionStrategy; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.testkit.TestProbe; Source<String, ActorRef> source = Source.<String>actorRefWithBackpressure( "ack", // complete when we send "complete" o -> { if (o == "complete") return Optional.of(CompletionStrategy.draining()); else return Optional.empty(); }, // do not fail on any message o -> Optional.empty()); ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system); probe.send(actorRef, "hello"); probe.expectMsg("ack"); probe.send(actorRef, "hello"); probe.expectMsg("ack"); // The stream completes successfully with the following message actorRef.tell("complete", ActorRef.noSender());
Reactive Streams semantics
emits when there is demand and there are messages in the buffer or a message is sent to the ActorRef
completes when the passed completion matcher returns a CompletionStrategy
or fails if the passed failure matcher returns an exception