Class ShardingProducerController
The ShardingProducerController
should be used together with ShardingConsumerController
.
A producer can send messages via a ShardingProducerController
to any ShardingConsumerController
identified by an entityId
. A single ShardingProducerController
per ActorSystem
(node) can be
shared for sending to all entities of a certain entity type. No explicit registration is needed
between the ShardingConsumerController
and ShardingProducerController
.
The producer actor will start the flow by sending a ShardingProducerController.Start
message to the ShardingProducerController
. The ActorRef
in the Start
message is
typically constructed as a message adapter to map the ShardingProducerController.RequestNext
to the protocol of the producer actor.
The ShardingProducerController
sends RequestNext
to the producer, which is then allowed
to send one message to the ShardingProducerController
via the sendNextTo
in the RequestNext
.
Thereafter the producer will receive a new RequestNext
when it's allowed to send one more message.
In the RequestNext
message there is information about which entities that have demand. It is allowed
to send to a new entityId
that is not included in the RequestNext.entitiesWithDemand
. If sending to
an entity that doesn't have demand the message will be buffered. This support for buffering means that
it is even allowed to send several messages in response to one RequestNext
but it's recommended to
only send one message and wait for next RequestNext
before sending more messages.
The producer and ShardingProducerController
actors are supposed to be local so that these messages are
fast and not lost. This is enforced by a runtime check.
There will be one ShardingConsumerController
for each entity. Many unconfirmed messages can be in
flight between the ShardingProducerController
and each ShardingConsumerController
. The flow control
is driven by the consumer side, which means that the ShardingProducerController
will not send faster
than the demand requested by the consumers.
Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side,
which means that the ShardingProducerController
will not push resends unless requested by the
ShardingConsumerController
.
Until sent messages have been confirmed the ShardingProducerController
keeps them in memory to be able to
resend them. If the JVM of the ShardingProducerController
crashes those unconfirmed messages are lost.
To make sure the messages can be delivered also in that scenario the ShardingProducerController
can be
used with a pekko.actor.typed.delivery.DurableProducerQueue
. Then the unconfirmed messages are stored in a durable way so
that they can be redelivered when the producer is started again. An implementation of the
DurableProducerQueue
is provided by EventSourcedProducerQueue
in pekko-persistence-typed
.
Instead of using tell
with the sendNextTo
in the RequestNext
the producer can use context.ask
with the askNextTo
in the RequestNext
. The difference is that a reply is sent back when the
message has been handled. If a DurableProducerQueue
is used then the reply is sent when the message
has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the
reply is sent after the consumer has processed and confirmed the message.
It's also possible to use the ShardingProducerController
and ShardingConsumerController
without resending
lost messages, but the flow control is still used. This can be more efficient since messages
don't have to be kept in memory in the ProducerController
until they have been
confirmed, but the drawback is that lost messages will not be delivered. See configuration
only-flow-control
of the ShardingConsumerController
.
The producerId
is used in logging and included as MDC entry with key "producerId"
. It's propagated
to the ConsumerController
and is useful for correlating log messages. It can be any String
but it's
recommended to use a unique identifier of representing the producer.
If the DurableProducerQueue
is defined it is created as a child actor of the ShardingProducerController
actor.
ProducerController
actors are created for each destination entity. Those child actors use the same dispatcher
as the parent ShardingProducerController
.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interface
static final class
For sending confirmation message back to the producer when the message has been confirmed.static class
static final class
TheProducerController
sendsRequestNext
to the producer when it is allowed to send one message via thesendNextTo
oraskNextTo
.static class
static final class
static class
static final class
Initial message from the producer actor.static class
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <A> Behavior<ShardingProducerController.Command<A>>
apply
(String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2) static <A> Behavior<ShardingProducerController.Command<A>>
apply
(String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1) static <A> Behavior<ShardingProducerController.Command<A>>
create
(Class<A> messageClass, String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior) Java APIstatic <A> Behavior<ShardingProducerController.Command<A>>
create
(Class<A> messageClass, String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings) Java APIstatic <A> Class<ShardingProducerController.RequestNext<A>>
Java API: The genericClass
type forShardingProducerController.RequestNext
that can be used when creating amessageAdapter
forClass<RequestNext
.>
-
Constructor Details
-
ShardingProducerController
public ShardingProducerController()
-
-
Method Details
-
requestNextClass
Java API: The genericClass
type forShardingProducerController.RequestNext
that can be used when creating amessageAdapter
forClass<RequestNext
.> -
apply
public static <A> Behavior<ShardingProducerController.Command<A>> apply(String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1) -
apply
public static <A> Behavior<ShardingProducerController.Command<A>> apply(String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2) -
create
public static <A> Behavior<ShardingProducerController.Command<A>> create(Class<A> messageClass, String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior) Java API -
create
public static <A> Behavior<ShardingProducerController.Command<A>> create(Class<A> messageClass, String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings) Java API
-