Class ProducerControllerImpl$
- java.lang.Object
-
- org.apache.pekko.actor.typed.delivery.internal.ProducerControllerImpl$
-
public class ProducerControllerImpl$ extends java.lang.ObjectINTERNAL API==== Design notes ====
The producer will start the flow by sending a
ProducerController.Startmessage to theProducerControllerwith message adapter reference to convertProducerController.RequestNextmessage. TheProducerControllersendsRequestNextto the producer, which is then allowed to send one message to theProducerController.The producer and
ProducerControllerare supposed to be local so that these messages are fast and not lost.The
ProducerControllersends the first message to theConsumerControllerwithout waiting for aRequestfrom theConsumerController. The main reason for this is that when used with Cluster Sharding the first message will typically create theConsumerController. It's also a way to connect the ProducerController and ConsumerController in a dynamic way, for example when the ProducerController is replaced.When the first message is received by the
ConsumerControllerit sends back the initialRequest, with demand of how many messages it can accept.Apart from the first message the
ProducerControllerwill not send more messages than requested by theConsumerController.When there is demand from the consumer side the
ProducerControllersendsRequestNextto the actual producer, which is then allowed to send one more message.Each message is wrapped by the
ProducerControllerinConsumerController.SequencedMessagewith a monotonically increasing sequence number without gaps, starting at 1.In other words, the "request" protocol to the application producer and consumer is one-by-one, but between the
ProducerControllerandConsumerControllerit's window of messages in flight.The
Requestmessage also contains aconfirmedSeqNrthat is the acknowledgement from the consumer that it has received and processed all messages up to that sequence number.The
ConsumerControllerwill sendProducerControllerImpl.Resendif a lost message is detected and then theProducerControllerwill resend all messages from that sequence number. The producer keeps unconfirmed messages in a buffer to be able to resend them. The buffer size is limited by the request window size.The resending is optional, and the
ConsumerControllercan be started withresendLost=falseto ignore lost messages, and then theProducerControllerwill not buffer unconfirmed messages. In that mode it provides only flow control but no reliable delivery.
-
-
Field Summary
Fields Modifier and Type Field Description static ProducerControllerImpl$MODULE$Static reference to the singleton instance of this Scala object.
-
Constructor Summary
Constructors Constructor Description ProducerControllerImpl$()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description <A> Behavior<ProducerController.Command<A>>apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.Function1<ConsumerController.SequencedMessage<A>,scala.runtime.BoxedUnit> send, scala.reflect.ClassTag<A> evidence$2)For customsendfunction.<A> Behavior<ProducerController.Command<A>>apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$1)<A> scala.collection.immutable.Seq<org.apache.pekko.actor.typed.delivery.internal.ChunkedMessage>createChunks(A m, int chunkSize, Serialization serialization)voidenforceLocalProducer(ActorRef<?> ref)
-
-
-
Field Detail
-
MODULE$
public static final ProducerControllerImpl$ MODULE$
Static reference to the singleton instance of this Scala object.
-
-
Method Detail
-
apply
public <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$1)
-
apply
public <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.Function1<ConsumerController.SequencedMessage<A>,scala.runtime.BoxedUnit> send, scala.reflect.ClassTag<A> evidence$2)
For customsendfunction. For example used with Sharding where the message must be wrapped inShardingEnvelope(SequencedMessage(msg)).
-
enforceLocalProducer
public void enforceLocalProducer(ActorRef<?> ref)
-
createChunks
public <A> scala.collection.immutable.Seq<org.apache.pekko.actor.typed.delivery.internal.ChunkedMessage> createChunks(A m, int chunkSize, Serialization serialization)
-
-