Class ProducerController
- java.lang.Object
-
- org.apache.pekko.actor.typed.delivery.ProducerController
-
public class ProducerController extends java.lang.ObjectPoint-to-point reliable delivery between a single producer actor sending messages and a single consumer actor receiving the messages. Used together withConsumerController.The producer actor will start the flow by sending a
ProducerController.Startmessage to theProducerController. TheActorRefin theStartmessage is typically constructed as a message adapter to map theProducerController.RequestNextto the protocol of the producer actor.For the
ProducerControllerto know where to send the messages it must be connected with theConsumerController. You do this is withProducerController.RegisterConsumerorConsumerController.RegisterToProducerControllermessages.The
ProducerControllersendsRequestNextto the producer, which is then allowed to send one message to theProducerControllervia thesendNextToin theRequestNext. Thereafter the producer will receive a newRequestNextwhen it's allowed to send one more message.The producer and
ProducerControlleractors are supposed to be local so that these messages are fast and not lost. This is enforced by a runtime check.Many unconfirmed messages can be in flight between the
ProducerControllerandConsumerController. The flow control is driven by the consumer side, which means that theProducerControllerwill not send faster than the demand requested by theConsumerController.Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side, which means that the
ProducerControllerwill not push resends unless requested by theConsumerController.Until sent messages have been confirmed the
ProducerControllerkeeps them in memory to be able to resend them. If the JVM of theProducerControllercrashes those unconfirmed messages are lost. To make sure the messages can be delivered also in that scenario theProducerControllercan be used with aDurableProducerQueue. Then the unconfirmed messages are stored in a durable way so that they can be redelivered when the producer is started again. An implementation of theDurableProducerQueueis provided byEventSourcedProducerQueueinpekko-persistence-typed.Instead of using
tellwith thesendNextToin theRequestNextthe producer can usecontext.askwith theaskNextToin theRequestNext. The difference is that a reply is sent back when the message has been handled. If aDurableProducerQueueis used then the reply is sent when the message has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the reply is sent after the consumer has processed and confirmed the message.If the consumer crashes a new
ConsumerControllercan be connected to the originalProducerConsumerwithout restarting it. TheProducerConsumerwill then redeliver all unconfirmed messages.It's also possible to use the
ProducerControllerandConsumerControllerwithout resending lost messages, but the flow control is still used. This can for example be useful when both consumer and producer are know to be located in the same localActorSystem. This can be more efficient since messages don't have to be kept in memory in theProducerControlleruntil they have been confirmed, but the drawback is that lost messages will not be delivered. See configurationonly-flow-controlof theConsumerController.The
producerIdis used in logging and included as MDC entry with key"producerId". It's propagated to theConsumerControllerand is useful for correlating log messages. It can be anyStringbut it's recommended to use a unique identifier of representing the producer.If the
DurableProducerQueueis defined it is created as a child actor of theProducerControlleractor. It will use the same dispatcher as the parentProducerController.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interfaceProducerController.Command<A>static classProducerController.MessageWithConfirmation<A>For sending confirmation message back to the producer when the message has been confirmed.static classProducerController.MessageWithConfirmation$static classProducerController.RegisterConsumer<A>Register the givenconsumerControllerto theProducerController.static classProducerController.RegisterConsumer$static classProducerController.RequestNext<A>TheProducerControllersendsRequestNextto the producer when it is allowed to send one message via thesendNextTooraskNextTo.static classProducerController.RequestNext$static classProducerController.Settingsstatic classProducerController.Settings$static classProducerController.Start<A>Initial message from the producer actor.static classProducerController.Start$
-
Constructor Summary
Constructors Constructor Description ProducerController()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <A> Behavior<ProducerController.Command<A>>apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)static <A> Behavior<ProducerController.Command<A>>apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)static <A> Behavior<ProducerController.Command<A>>create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)Java APIstatic <A> Behavior<ProducerController.Command<A>>create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings)Java APIstatic <A> java.lang.Class<ProducerController.RequestNext<A>>requestNextClass()Java API: The genericClasstype forProducerController.RequestNextthat can be used when creating amessageAdapterforClass.>
-
-
-
Method Detail
-
requestNextClass
public static <A> java.lang.Class<ProducerController.RequestNext<A>> requestNextClass()
Java API: The genericClasstype forProducerController.RequestNextthat can be used when creating amessageAdapterforClass.>
-
apply
public static <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
-
apply
public static <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
-
create
public static <A> Behavior<ProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java API
-
create
public static <A> Behavior<ProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings)
Java API
-
-