Sink.actorRefWithBackpressure
Send the elements from the stream to an ActorRef
(of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
Signature¶
Description¶
Send the elements from the stream to an ActorRef
which must then acknowledge reception after completing a message, to provide back pressure onto the sink. There is also a variant without a concrete acknowledge message accepting any message as such.
See also:
Sink.actorRef
Send elements to an actor, without considering backpressureActorSink.actorRef
The corresponding operator for the new actors APIActorSink.actorRefWithBackpressure
Send elements to an actor of the new actors API supporting backpressure
Example¶
Actor to be interacted with:
sourceobject AckingReceiver {
case object Ack
case object StreamInitialized
case object StreamCompleted
final case class StreamFailure(ex: Throwable)
}
class AckingReceiver(probe: ActorRef) extends Actor with ActorLogging {
import AckingReceiver._
def receive: Receive = {
case StreamInitialized =>
log.info("Stream initialized!")
probe ! "Stream initialized!"
sender() ! Ack // ack to allow the stream to proceed sending more elements
case el: String =>
log.info("Received element: {}", el)
probe ! el
sender() ! Ack // ack to allow the stream to proceed sending more elements
case StreamCompleted =>
log.info("Stream completed!")
probe ! "Stream completed!"
case StreamFailure(ex) =>
log.error(ex, "Stream failed!")
}
}
sourceenum Ack {
INSTANCE;
}
static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
private final Throwable cause;
public StreamFailure(Throwable cause) {
this.cause = cause;
}
public Throwable getCause() {
return cause;
}
}
static class AckingReceiver extends AbstractLoggingActor {
private final ActorRef probe;
public AckingReceiver(ActorRef probe) {
this.probe = probe;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
StreamInitialized.class,
init -> {
log().info("Stream initialized");
probe.tell("Stream initialized", getSelf());
sender().tell(Ack.INSTANCE, self());
})
.match(
String.class,
element -> {
log().info("Received element: {}", element);
probe.tell(element, getSelf());
sender().tell(Ack.INSTANCE, self());
})
.match(
StreamCompleted.class,
completed -> {
log().info("Stream completed");
probe.tell("Stream completed", getSelf());
})
.match(
StreamFailure.class,
failed -> {
log().error(failed.getCause(), "Stream failed!");
probe.tell("Stream failed!", getSelf());
})
.build();
}
}
Using the actorRefWithBackpressure
operator with the above actor:
sourceval words: Source[String, NotUsed] =
Source(List("hello", "hi"))
// sent from actor to stream to "ack" processing of given element
val AckMessage = AckingReceiver.Ack
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = AckingReceiver.StreamInitialized
val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)
val probe = TestProbe()
val receiver = system.actorOf(Props(new AckingReceiver(probe.ref)))
val sink = Sink.actorRefWithBackpressure(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage)
words.map(_.toLowerCase).runWith(sink)
probe.expectMsg("Stream initialized!")
probe.expectMsg("hello")
probe.expectMsg("hi")
probe.expectMsg("Stream completed!")
sourceSource<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi"));
final TestKit probe = new TestKit(system);
ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef()));
Sink<String, NotUsed> sink =
Sink.<String>actorRefWithBackpressure(
receiver,
new StreamInitialized(),
Ack.INSTANCE,
new StreamCompleted(),
ex -> new StreamFailure(ex));
words.map(el -> el.toLowerCase()).runWith(sink, system);
probe.expectMsg("Stream initialized");
probe.expectMsg("hello");
probe.expectMsg("hi");
probe.expectMsg("Stream completed");
Reactive Streams semantics¶
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived
1.0.3