Class Replicator

  • All Implemented Interfaces:
    Actor, ActorLogging

    public final class Replicator
    extends java.lang.Object
    implements Actor, ActorLogging
    A replicated in-memory data store supporting low latency and high availability requirements.

    The Replicator actor takes care of direct replication and gossip based dissemination of Conflict Free Replicated Data Types (CRDTs) to replicas in the the cluster. The data types must be convergent CRDTs and implement ReplicatedData, i.e. they provide a monotonic merge function and the state changes always converge.

    You can use your own custom ReplicatedData or DeltaReplicatedData types, and several types are provided by this package, such as:

    For good introduction to the CRDT subject watch the Eventually Consistent Data Structures talk by Sean Cribbs and and the talk by Mark Shapiro and read the excellent paper A comprehensive study of Convergent and Commutative Replicated Data Types by Mark Shapiro et. al.

    The Replicator actor must be started on each node in the cluster, or group of nodes tagged with a specific role. It communicates with other Replicator instances with the same path (without address) that are running on other nodes . For convenience it can be used with the DistributedData extension but it can also be started as an ordinary actor using the Replicator.props. If it is started as an ordinary actor it is important that it is given the same name, started on same path, on all nodes.

    Delta State Replicated Data Types are supported. delta-CRDT is a way to reduce the need for sending the full state for updates. For example adding element 'c' and 'd' to set {'a', 'b'} would result in sending the delta {'c', 'd'} and merge that with the state on the receiving side, resulting in set {'a', 'b', 'c', 'd'}.

    The protocol for replicating the deltas supports causal consistency if the data type is marked with RequiresCausalDeliveryOfDeltas. Otherwise it is only eventually consistent. Without causal consistency it means that if elements 'c' and 'd' are added in two separate Update operations these deltas may occasionally be propagated to nodes in different order than the causal order of the updates. For this example it can result in that set {'a', 'b', 'd'} can be seen before element 'c' is seen. Eventually it will be {'a', 'b', 'c', 'd'}.

    == Update ==

    To modify and replicate a ReplicatedData value you send a Replicator.Update message to the local Replicator. The current data value for the key of the Update is passed as parameter to the modify function of the Update. The function is supposed to return the new value of the data, which will then be replicated according to the given consistency level.

    The modify function is called by the Replicator actor and must therefore be a pure function that only uses the data parameter and stable fields from enclosing scope. It must for example not access sender() reference of an enclosing actor.

    Update is intended to only be sent from an actor running in same local ActorSystem as the Replicator, because the modify function is typically not serializable.

    You supply a write consistency level which has the following meaning:

    • WriteLocal the value will immediately only be written to the local replica, and later disseminated with gossip
    • WriteTo(n) the value will immediately be written to at least n replicas, including the local replica
    • WriteMajority the value will immediately be written to a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
    • WriteAll the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group)

    As reply of the Update a Replicator.UpdateSuccess is sent to the sender of the Update if the value was successfully replicated according to the supplied consistency level within the supplied timeout. Otherwise a Replicator.UpdateFailure subclass is sent back. Note that a Replicator.UpdateTimeout reply does not mean that the update completely failed or was rolled back. It may still have been replicated to some nodes, and will eventually be replicated to all nodes with the gossip protocol.

    You will always see your own writes. For example if you send two Update messages changing the value of the same key, the modify function of the second message will see the change that was performed by the first Update message.

    In the Update message you can pass an optional request context, which the Replicator does not care about, but is included in the reply messages. This is a convenient way to pass contextual information (e.g. original sender) without having to use ask or local correlation data structures.

    == Get ==

    To retrieve the current value of a data you send Replicator.Get message to the Replicator. You supply a consistency level which has the following meaning:

    • ReadLocal the value will only be read from the local replica
    • ReadFrom(n) the value will be read and merged from n replicas, including the local replica
    • ReadMajority the value will be read and merged from a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
    • ReadAll the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group)

    As reply of the Get a Replicator.GetSuccess is sent to the sender of the Get if the value was successfully retrieved according to the supplied consistency level within the supplied timeout. Otherwise a Replicator.GetFailure is sent. If the key does not exist the reply will be Replicator.NotFound.

    You will always read your own writes. For example if you send a Update message followed by a Get of the same key the Get will retrieve the change that was performed by the preceding Update message. However, the order of the reply messages are not defined, i.e. in the previous example you may receive the GetSuccess before the UpdateSuccess.

    In the Get message you can pass an optional request context in the same way as for the Update message, described above. For example the original sender can be passed and replied to after receiving and transforming GetSuccess.

    == Subscribe ==

    You may also register interest in change notifications by sending Replicator.Subscribe message to the Replicator. It will send Replicator.Changed messages to the registered subscriber when the data for the subscribed key is updated. Subscribers will be notified periodically with the configured notify-subscribers-interval, and it is also possible to send an explicit Replicator.FlushChanges message to the Replicator to notify the subscribers immediately.

    The subscriber is automatically removed if the subscriber is terminated. A subscriber can also be deregistered with the Replicator.Unsubscribe message.

    == Delete ==

    A data entry can be deleted by sending a Replicator.Delete message to the local local Replicator. As reply of the Delete a Replicator.DeleteSuccess is sent to the sender of the Delete if the value was successfully deleted according to the supplied consistency level within the supplied timeout. Otherwise a Replicator.ReplicationDeleteFailure is sent. Note that ReplicationDeleteFailure does not mean that the delete completely failed or was rolled back. It may still have been replicated to some nodes, and may eventually be replicated to all nodes.

    A deleted key cannot be reused again, but it is still recommended to delete unused data entries because that reduces the replication overhead when new nodes join the cluster. Subsequent Delete, Update and Get requests will be replied with Replicator.DataDeleted, Replicator.UpdateDataDeleted and Replicator.GetDataDeleted respectively. Subscribers will receive Replicator.Deleted.

    In the Delete message you can pass an optional request context in the same way as for the Update message, described above. For example the original sender can be passed and replied to after receiving and transforming DeleteSuccess.

    == CRDT Garbage ==

    One thing that can be problematic with CRDTs is that some data types accumulate history (garbage). For example a GCounter keeps track of one counter per node. If a GCounter has been updated from one node it will associate the identifier of that node forever. That can become a problem for long running systems with many cluster nodes being added and removed. To solve this problem the Replicator performs pruning of data associated with nodes that have been removed from the cluster. Data types that need pruning have to implement RemovedNodePruning. The pruning consists of several steps:

    1. When a node is removed from the cluster it is first important that all updates that were done by that node are disseminated to all other nodes. The pruning will not start before the maxPruningDissemination duration has elapsed. The time measurement is stopped when any replica is unreachable, but it's still recommended to configure this with certain margin. It should be in the magnitude of minutes.
    2. The nodes are ordered by their address and the node ordered first is called leader. The leader initiates the pruning by adding a PruningInitialized marker in the data envelope. This is gossiped to all other nodes and they mark it as seen when they receive it.
    3. When the leader sees that all other nodes have seen the PruningInitialized marker the leader performs the pruning and changes the marker to PruningPerformed so that nobody else will redo the pruning. The data envelope with this pruning state is a CRDT itself. The pruning is typically performed by "moving" the part of the data associated with the removed node to the leader node. For example, a GCounter is a Map with the node as key and the counts done by that node as value. When pruning the value of the removed node is moved to the entry owned by the leader node. See RemovedNodePruning.prune(org.apache.pekko.cluster.UniqueAddress, org.apache.pekko.cluster.UniqueAddress).
    4. Thereafter the data is always cleared from parts associated with the removed node so that it does not come back when merging. See RemovedNodePruning.pruningCleanup(org.apache.pekko.cluster.UniqueAddress)
    5. After another maxPruningDissemination duration after pruning the last entry from the removed node the PruningPerformed markers in the data envelope are collapsed into a single tombstone entry, for efficiency. Clients may continue to use old data and therefore all data are always cleared from parts associated with tombstoned nodes.
    • Method Detail

      • DefaultMajorityMinCap

        public static int DefaultMajorityMinCap()
      • context

        public ActorContext context()
        Description copied from interface: Actor
        Scala API: Stores the context for this actor, including self, and sender. It is implicit to support operations such as forward.

        WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!

        pekko.actor.ActorContext is the Scala API. getContext returns a pekko.actor.AbstractActor.ActorContext, which is the Java API of the actor context.

        Specified by:
        context in interface Actor
      • self

        public final ActorRef self()
        Description copied from interface: Actor
        The 'self' field holds the ActorRef for this actor.

        Can be used to send messages to itself:
         self ! message
         
        Specified by:
        self in interface Actor
      • org$apache$pekko$actor$Actor$_setter_$context_$eq

        protected void org$apache$pekko$actor$Actor$_setter_$context_$eq​(ActorContext x$1)
        Description copied from interface: Actor
        Scala API: Stores the context for this actor, including self, and sender. It is implicit to support operations such as forward.

        WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!

        pekko.actor.ActorContext is the Scala API. getContext returns a pekko.actor.AbstractActor.ActorContext, which is the Java API of the actor context.

        Specified by:
        org$apache$pekko$actor$Actor$_setter_$context_$eq in interface Actor
      • org$apache$pekko$actor$Actor$_setter_$self_$eq

        protected final void org$apache$pekko$actor$Actor$_setter_$self_$eq​(ActorRef x$1)
        Description copied from interface: Actor
        The 'self' field holds the ActorRef for this actor.

        Can be used to send messages to itself:
         self ! message
         
        Specified by:
        org$apache$pekko$actor$Actor$_setter_$self_$eq in interface Actor
      • cluster

        public Cluster cluster()
      • selfAddress

        public Address selfAddress()
      • selfFromSystemUid

        public scala.Some<java.lang.Object> selfFromSystemUid()
      • pruningTask

        public scala.Option<Cancellable> pruningTask()
      • maxPruningDisseminationNanos

        public long maxPruningDisseminationNanos()
      • hasDurableKeys

        public boolean hasDurableKeys()
      • durable

        public scala.collection.immutable.Set<java.lang.String> durable()
      • durableWildcards

        public scala.collection.immutable.Set<java.lang.String> durableWildcards()
      • durableStore

        public ActorRef durableStore()
      • deltaPropagationSelector

        public java.lang.Object deltaPropagationSelector()
      • deltaPropagationTask

        public scala.Option<Cancellable> deltaPropagationTask()
      • nodes

        public scala.collection.immutable.SortedSet<UniqueAddress> nodes()
      • nodes_$eq

        public void nodes_$eq​(scala.collection.immutable.SortedSet<UniqueAddress> x$1)
      • membersByAge

        public scala.collection.immutable.SortedSet<Member> membersByAge()
      • membersByAge_$eq

        public void membersByAge_$eq​(scala.collection.immutable.SortedSet<Member> x$1)
      • weaklyUpNodes

        public scala.collection.immutable.SortedSet<UniqueAddress> weaklyUpNodes()
      • weaklyUpNodes_$eq

        public void weaklyUpNodes_$eq​(scala.collection.immutable.SortedSet<UniqueAddress> x$1)
      • joiningNodes

        public scala.collection.immutable.SortedSet<UniqueAddress> joiningNodes()
      • joiningNodes_$eq

        public void joiningNodes_$eq​(scala.collection.immutable.SortedSet<UniqueAddress> x$1)
      • exitingNodes

        public scala.collection.immutable.SortedSet<UniqueAddress> exitingNodes()
      • exitingNodes_$eq

        public void exitingNodes_$eq​(scala.collection.immutable.SortedSet<UniqueAddress> x$1)
      • removedNodes

        public scala.collection.immutable.Map<UniqueAddress,​java.lang.Object> removedNodes()
      • removedNodes_$eq

        public void removedNodes_$eq​(scala.collection.immutable.Map<UniqueAddress,​java.lang.Object> x$1)
      • leader

        public scala.collection.immutable.TreeSet<Member> leader()
      • leader_$eq

        public void leader_$eq​(scala.collection.immutable.TreeSet<Member> x$1)
      • isLeader

        public boolean isLeader()
      • previousClockTime

        public long previousClockTime()
      • previousClockTime_$eq

        public void previousClockTime_$eq​(long x$1)
      • allReachableClockTime

        public long allReachableClockTime()
      • allReachableClockTime_$eq

        public void allReachableClockTime_$eq​(long x$1)
      • unreachable

        public scala.collection.immutable.Set<UniqueAddress> unreachable()
      • unreachable_$eq

        public void unreachable_$eq​(scala.collection.immutable.Set<UniqueAddress> x$1)
      • dataEntries

        public scala.collection.immutable.Map<java.lang.String,​scala.Tuple2<org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope,​ByteString>> dataEntries()
      • dataEntries_$eq

        public void dataEntries_$eq​(scala.collection.immutable.Map<java.lang.String,​scala.Tuple2<org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope,​ByteString>> x$1)
      • changed

        public scala.collection.immutable.Set<java.lang.String> changed()
      • changed_$eq

        public void changed_$eq​(scala.collection.immutable.Set<java.lang.String> x$1)
      • statusCount

        public long statusCount()
      • statusCount_$eq

        public void statusCount_$eq​(long x$1)
      • statusTotChunks

        public int statusTotChunks()
      • statusTotChunks_$eq

        public void statusTotChunks_$eq​(int x$1)
      • fullStateGossipEnabled

        public boolean fullStateGossipEnabled()
      • fullStateGossipEnabled_$eq

        public void fullStateGossipEnabled_$eq​(boolean x$1)
      • subscribers

        public scala.collection.mutable.HashMap<java.lang.String,​scala.collection.mutable.Set<ActorRef>> subscribers()
      • newSubscribers

        public scala.collection.mutable.HashMap<java.lang.String,​scala.collection.mutable.Set<ActorRef>> newSubscribers()
      • subscriptionKeys

        public scala.collection.immutable.Map<java.lang.String,​Key<ReplicatedData>> subscriptionKeys()
      • subscriptionKeys_$eq

        public void subscriptionKeys_$eq​(scala.collection.immutable.Map<java.lang.String,​Key<ReplicatedData>> x$1)
      • replyTo_$eq

        public void replyTo_$eq​(ActorRef x$1)
      • aroundReceive

        protected void aroundReceive​(scala.PartialFunction<java.lang.Object,​scala.runtime.BoxedUnit> rcv,
                                     java.lang.Object msg)
        Description copied from interface: Actor
        INTERNAL API.

        Can be overridden to intercept calls to this actor's current behavior.

        Specified by:
        aroundReceive in interface Actor
        Parameters:
        rcv - current behavior.
        msg - current message.
      • preStart

        public void preStart()
        Description copied from interface: Actor
        User overridable callback.

        Is called when an Actor is started. Actors are automatically started asynchronously when created. Empty default implementation.
        Specified by:
        preStart in interface Actor
      • postStop

        public void postStop()
        Description copied from interface: Actor
        User overridable callback.

        Is called asynchronously after 'actor.stop()' is invoked. Empty default implementation.
        Specified by:
        postStop in interface Actor
      • matchingRole

        public boolean matchingRole​(Member m)
      • supervisorStrategy

        public OneForOneStrategy supervisorStrategy()
        Description copied from interface: Actor
        User overridable definition the strategy to use for supervising child actors.
        Specified by:
        supervisorStrategy in interface Actor
      • receive

        public scala.PartialFunction<java.lang.Object,​scala.runtime.BoxedUnit> receive()
        Description copied from interface: Actor
        Scala API: This defines the initial actor behavior, it must return a partial function with the actor logic.
        Specified by:
        receive in interface Actor
      • load

        public scala.PartialFunction<java.lang.Object,​scala.runtime.BoxedUnit> load()
      • normalReceive

        public scala.PartialFunction<java.lang.Object,​scala.runtime.BoxedUnit> normalReceive()
      • receiveRead

        public void receiveRead​(java.lang.String key)
      • isLocalSender

        public boolean isLocalSender()
      • isDurable

        public boolean isDurable​(java.lang.String key)
      • receiveWrite

        public void receiveWrite​(java.lang.String key,
                                 org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope envelope)
      • writeAndStore

        public void writeAndStore​(java.lang.String key,
                                  org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope writeEnvelope,
                                  boolean reply)
      • write

        public scala.Option<org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope> write​(java.lang.String key,
                                                                                                   org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope writeEnvelope)
      • receiveReadRepair

        public void receiveReadRepair​(java.lang.String key,
                                      org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope writeEnvelope)
      • receiveGetKeyIds

        public void receiveGetKeyIds()
      • setData

        public org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope setData​(java.lang.String key,
                                                                                       org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope envelope)
      • getDigest

        public ByteString getDigest​(java.lang.String key)
      • digest

        public scala.Tuple2<ByteString,​java.lang.Object> digest​(org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope envelope)
        Returns:
        SHA-1 digest of the serialized data, and the size of the serialized data
      • getData

        public scala.Option<org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope> getData​(java.lang.String key)
      • getDeltaSeqNr

        public long getDeltaSeqNr​(java.lang.String key,
                                  UniqueAddress fromNode)
      • isNodeRemoved

        public boolean isNodeRemoved​(UniqueAddress node,
                                     scala.collection.Iterable<java.lang.String> keys)
      • receiveFlushChanges

        public void receiveFlushChanges()
      • receiveDeltaPropagationTick

        public void receiveDeltaPropagationTick()
      • receiveDeltaPropagation

        public void receiveDeltaPropagation​(UniqueAddress fromNode,
                                            boolean reply,
                                            scala.collection.immutable.Map<java.lang.String,​org.apache.pekko.cluster.ddata.Replicator.Internal.Delta> deltas)
      • receiveGossipTick

        public void receiveGossipTick()
      • selectRandomNode

        public scala.Option<UniqueAddress> selectRandomNode​(scala.collection.immutable.IndexedSeq<UniqueAddress> addresses)
      • receiveStatus

        public void receiveStatus​(scala.collection.immutable.Map<java.lang.String,​ByteString> otherDigests,
                                  int chunk,
                                  int totChunks,
                                  scala.Option<java.lang.Object> fromSystemUid)
      • receiveGossip

        public void receiveGossip​(scala.collection.immutable.Map<java.lang.String,​org.apache.pekko.cluster.ddata.Replicator.Internal.DataEnvelope> updatedData,
                                  boolean sendBack,
                                  scala.Option<java.lang.Object> fromSystemUid)
      • hasSubscriber

        public boolean hasSubscriber​(ActorRef subscriber)
      • receiveTerminated

        public void receiveTerminated​(ActorRef ref)
      • receiveMemberJoining

        public void receiveMemberJoining​(Member m)
      • receiveMemberWeaklyUp

        public void receiveMemberWeaklyUp​(Member m)
      • receiveMemberUp

        public void receiveMemberUp​(Member m)
      • receiveMemberExiting

        public void receiveMemberExiting​(Member m)
      • receiveMemberRemoved

        public void receiveMemberRemoved​(Member m)
      • receiveOtherMemberEvent

        public void receiveOtherMemberEvent​(Member m)
      • receiveUnreachable

        public void receiveUnreachable​(Member m)
      • receiveReachable

        public void receiveReachable​(Member m)
      • receiveClockTick

        public void receiveClockTick()
      • receiveRemovedNodePruningTick

        public void receiveRemovedNodePruningTick()
      • collectRemovedNodes

        public void collectRemovedNodes()
      • initRemovedNodePruning

        public void initRemovedNodePruning()
      • performRemovedNodePruning

        public void performRemovedNodePruning()
      • deleteObsoletePruningPerformed

        public void deleteObsoletePruningPerformed()
      • receiveGetReplicaCount

        public void receiveGetReplicaCount()