Class ShardingProducerController$
- java.lang.Object
-
- org.apache.pekko.cluster.sharding.typed.delivery.ShardingProducerController$
-
public class ShardingProducerController$ extends java.lang.ObjectReliable delivery between a producer actor sending messages to sharded consumer actors receiving the messages.The
ShardingProducerControllershould be used together withShardingConsumerController.A producer can send messages via a
ShardingProducerControllerto anyShardingConsumerControlleridentified by anentityId. A singleShardingProducerControllerperActorSystem(node) can be shared for sending to all entities of a certain entity type. No explicit registration is needed between theShardingConsumerControllerandShardingProducerController.The producer actor will start the flow by sending a
ShardingProducerController.Startmessage to theShardingProducerController. TheActorRefin theStartmessage is typically constructed as a message adapter to map theShardingProducerController.RequestNextto the protocol of the producer actor.The
ShardingProducerControllersendsRequestNextto the producer, which is then allowed to send one message to theShardingProducerControllervia thesendNextToin theRequestNext. Thereafter the producer will receive a newRequestNextwhen it's allowed to send one more message.In the
RequestNextmessage there is information about which entities that have demand. It is allowed to send to a newentityIdthat is not included in theRequestNext.entitiesWithDemand. If sending to an entity that doesn't have demand the message will be buffered. This support for buffering means that it is even allowed to send several messages in response to oneRequestNextbut it's recommended to only send one message and wait for nextRequestNextbefore sending more messages.The producer and
ShardingProducerControlleractors are supposed to be local so that these messages are fast and not lost. This is enforced by a runtime check.There will be one
ShardingConsumerControllerfor each entity. Many unconfirmed messages can be in flight between theShardingProducerControllerand eachShardingConsumerController. The flow control is driven by the consumer side, which means that theShardingProducerControllerwill not send faster than the demand requested by the consumers.Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side, which means that the
ShardingProducerControllerwill not push resends unless requested by theShardingConsumerController.Until sent messages have been confirmed the
ShardingProducerControllerkeeps them in memory to be able to resend them. If the JVM of theShardingProducerControllercrashes those unconfirmed messages are lost. To make sure the messages can be delivered also in that scenario theShardingProducerControllercan be used with apekko.actor.typed.delivery.DurableProducerQueue. Then the unconfirmed messages are stored in a durable way so that they can be redelivered when the producer is started again. An implementation of theDurableProducerQueueis provided byEventSourcedProducerQueueinpekko-persistence-typed.Instead of using
tellwith thesendNextToin theRequestNextthe producer can usecontext.askwith theaskNextToin theRequestNext. The difference is that a reply is sent back when the message has been handled. If aDurableProducerQueueis used then the reply is sent when the message has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the reply is sent after the consumer has processed and confirmed the message.It's also possible to use the
ShardingProducerControllerandShardingConsumerControllerwithout resending lost messages, but the flow control is still used. This can be more efficient since messages don't have to be kept in memory in theProducerControlleruntil they have been confirmed, but the drawback is that lost messages will not be delivered. See configurationonly-flow-controlof theShardingConsumerController.The
producerIdis used in logging and included as MDC entry with key"producerId". It's propagated to theConsumerControllerand is useful for correlating log messages. It can be anyStringbut it's recommended to use a unique identifier of representing the producer.If the
DurableProducerQueueis defined it is created as a child actor of theShardingProducerControlleractor.ProducerControlleractors are created for each destination entity. Those child actors use the same dispatcher as the parentShardingProducerController.
-
-
Field Summary
Fields Modifier and Type Field Description static ShardingProducerController$MODULE$Static reference to the singleton instance of this Scala object.
-
Constructor Summary
Constructors Constructor Description ShardingProducerController$()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description <A> Behavior<ShardingProducerController.Command<A>>apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)<A> Behavior<ShardingProducerController.Command<A>>apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)<A> Behavior<ShardingProducerController.Command<A>>create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)Java API<A> Behavior<ShardingProducerController.Command<A>>create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings)Java API<A> java.lang.Class<ShardingProducerController.RequestNext<A>>requestNextClass()Java API: The genericClasstype forShardingProducerController.RequestNextthat can be used when creating amessageAdapterforClass.>
-
-
-
Field Detail
-
MODULE$
public static final ShardingProducerController$ MODULE$
Static reference to the singleton instance of this Scala object.
-
-
Method Detail
-
requestNextClass
public <A> java.lang.Class<ShardingProducerController.RequestNext<A>> requestNextClass()
Java API: The genericClasstype forShardingProducerController.RequestNextthat can be used when creating amessageAdapterforClass.>
-
apply
public <A> Behavior<ShardingProducerController.Command<A>> apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
-
apply
public <A> Behavior<ShardingProducerController.Command<A>> apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
-
create
public <A> Behavior<ShardingProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java API
-
create
public <A> Behavior<ShardingProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings)
Java API
-
-