Class ProducerControllerImpl$
==== Design notes ====
The producer will start the flow by sending a ProducerController.Start message to the ProducerController with
message adapter reference to convert ProducerController.RequestNext message.
The ProducerController sends RequestNext to the producer, which is then allowed to send one message to
the ProducerController.
The producer and ProducerController are supposed to be local so that these messages are fast and not lost.
The ProducerController sends the first message to the ConsumerController without waiting for
a Request from the ConsumerController. The main reason for this is that when used with
Cluster Sharding the first message will typically create the ConsumerController. It's
also a way to connect the ProducerController and ConsumerController in a dynamic way, for
example when the ProducerController is replaced.
When the first message is received by the ConsumerController it sends back the initial Request,
with demand of how many messages it can accept.
Apart from the first message the ProducerController will not send more messages than requested
by the ConsumerController.
When there is demand from the consumer side the ProducerController sends RequestNext to the
actual producer, which is then allowed to send one more message.
Each message is wrapped by the ProducerController in ConsumerController.SequencedMessage with
a monotonically increasing sequence number without gaps, starting at 1.
In other words, the "request" protocol to the application producer and consumer is one-by-one, but
between the ProducerController and ConsumerController it's window of messages in flight.
The Request message also contains a confirmedSeqNr that is the acknowledgement
from the consumer that it has received and processed all messages up to that sequence number.
The ConsumerController will send ProducerControllerImpl.Resend if a lost message is detected
and then the ProducerController will resend all messages from that sequence number. The producer keeps
unconfirmed messages in a buffer to be able to resend them. The buffer size is limited
by the request window size.
The resending is optional, and the ConsumerController can be started with resendLost=false
to ignore lost messages, and then the ProducerController will not buffer unconfirmed messages.
In that mode it provides only flow control but no reliable delivery.
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final ProducerControllerImpl$Static reference to the singleton instance of this Scala object. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription<A> Behavior<ProducerController.Command<A>>apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.Function1<ConsumerController.SequencedMessage<A>, scala.runtime.BoxedUnit> send, scala.reflect.ClassTag<A> evidence$2) For customsendfunction.<A> Behavior<ProducerController.Command<A>>apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$1) <A> scala.collection.immutable.Seq<org.apache.pekko.actor.typed.delivery.internal.ChunkedMessage>createChunks(A m, int chunkSize, Serialization serialization) voidenforceLocalProducer(ActorRef<?> ref)
-
Field Details
-
MODULE$
Static reference to the singleton instance of this Scala object.
-
-
Constructor Details
-
ProducerControllerImpl$
public ProducerControllerImpl$()
-
-
Method Details
-
apply
public <A> Behavior<ProducerController.Command<A>> apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$1) -
apply
public <A> Behavior<ProducerController.Command<A>> apply(String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.Function1<ConsumerController.SequencedMessage<A>, scala.runtime.BoxedUnit> send, scala.reflect.ClassTag<A> evidence$2) For customsendfunction. For example used with Sharding where the message must be wrapped inShardingEnvelope(SequencedMessage(msg)). -
enforceLocalProducer
-
createChunks
public <A> scala.collection.immutable.Seq<org.apache.pekko.actor.typed.delivery.internal.ChunkedMessage> createChunks(A m, int chunkSize, Serialization serialization)
-