Class ProducerController
ConsumerController.
The producer actor will start the flow by sending a ProducerController.Start message to
the ProducerController. The ActorRef in the Start message is typically constructed
as a message adapter to map the ProducerController.RequestNext to the protocol of the
producer actor.
For the ProducerController to know where to send the messages it must be connected with the
ConsumerController. You do this is with ProducerController.RegisterConsumer or
ConsumerController.RegisterToProducerController messages.
The ProducerController sends RequestNext to the producer, which is then allowed to send one
message to the ProducerController via the sendNextTo in the RequestNext. Thereafter the
producer will receive a new RequestNext when it's allowed to send one more message.
The producer and ProducerController actors are supposed to be local so that these messages are
fast and not lost. This is enforced by a runtime check.
Many unconfirmed messages can be in flight between the ProducerController and ConsumerController.
The flow control is driven by the consumer side, which means that the ProducerController will
not send faster than the demand requested by the ConsumerController.
Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side,
which means that the ProducerController will not push resends unless requested by the
ConsumerController.
Until sent messages have been confirmed the ProducerController keeps them in memory to be able to
resend them. If the JVM of the ProducerController crashes those unconfirmed messages are lost.
To make sure the messages can be delivered also in that scenario the ProducerController can be
used with a DurableProducerQueue. Then the unconfirmed messages are stored in a durable way so
that they can be redelivered when the producer is started again. An implementation of the
DurableProducerQueue is provided by EventSourcedProducerQueue in pekko-persistence-typed.
Instead of using tell with the sendNextTo in the RequestNext the producer can use context.ask
with the askNextTo in the RequestNext. The difference is that a reply is sent back when the
message has been handled. If a DurableProducerQueue is used then the reply is sent when the message
has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the
reply is sent after the consumer has processed and confirmed the message.
If the consumer crashes a new ConsumerController can be connected to the original ProducerConsumer
without restarting it. The ProducerConsumer will then redeliver all unconfirmed messages.
It's also possible to use the ProducerController and ConsumerController without resending
lost messages, but the flow control is still used. This can for example be useful when both consumer and
producer are know to be located in the same local ActorSystem. This can be more efficient since messages
don't have to be kept in memory in the ProducerController until they have been
confirmed, but the drawback is that lost messages will not be delivered. See configuration
only-flow-control of the ConsumerController.
The producerId is used in logging and included as MDC entry with key "producerId". It's propagated
to the ConsumerController and is useful for correlating log messages. It can be any String but it's
recommended to use a unique identifier of representing the producer.
If the DurableProducerQueue is defined it is created as a child actor of the ProducerController actor.
It will use the same dispatcher as the parent ProducerController.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfacestatic final classFor sending confirmation message back to the producer when the message has been confirmed.static classstatic final classRegister the givenconsumerControllerto theProducerController.static classstatic final classTheProducerControllersendsRequestNextto the producer when it is allowed to send one message via thesendNextTooraskNextTo.static classstatic final classstatic classstatic final classInitial message from the producer actor.static class -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <A> Behavior<ProducerController.Command<A>>apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2) static <A> Behavior<ProducerController.Command<A>>apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1) static <A> Behavior<ProducerController.Command<A>>create(Class<A> messageClass, String producerId, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior) Java APIstatic <A> Behavior<ProducerController.Command<A>>create(Class<A> messageClass, String producerId, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings) Java APIstatic <A> Class<ProducerController.RequestNext<A>>Java API: The genericClasstype forProducerController.RequestNextthat can be used when creating amessageAdapterforClass<RequestNext.>
-
Constructor Details
-
ProducerController
public ProducerController()
-
-
Method Details
-
requestNextClass
Java API: The genericClasstype forProducerController.RequestNextthat can be used when creating amessageAdapterforClass<RequestNext.> -
apply
public static <A> Behavior<ProducerController.Command<A>> apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1) -
apply
public static <A> Behavior<ProducerController.Command<A>> apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2) -
create
public static <A> Behavior<ProducerController.Command<A>> create(Class<A> messageClass, String producerId, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior) Java API -
create
public static <A> Behavior<ProducerController.Command<A>> create(Class<A> messageClass, String producerId, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings) Java API
-