Classic Distributed Data

Note

Pekko Classic pertains to the original Actor APIs, which have been improved by more type safe and guided Actor APIs. Pekko Classic is still fully supported and existing applications can continue to use the classic APIs. It is also possible to use the new Actor APIs together with classic actors in the same ActorSystem, see coexistence. For new projects we recommend using the new Actor API.

For the full documentation of this feature and for new projects see Distributed Data.

Dependency

To use Pekko Distributed Data, you must add the following dependency in your project:

sbt
val PekkoVersion = "1.1.2+29-e21fa9eb-SNAPSHOT"
libraryDependencies += "org.apache.pekko" %% "pekko-distributed-data" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2+29-e21fa9eb-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-distributed-data_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+29-e21fa9eb-SNAPSHOT")

  implementation "org.apache.pekko:pekko-distributed-data_${versions.ScalaBinary}"
}

Introduction

For the full documentation of this feature and for new projects see Distributed Data - Introduction.

Using the Replicator

The ReplicatorReplicator actor provides the API for interacting with the data. The Replicator actor must be started on each node in the cluster, or group of nodes tagged with a specific role. It communicates with other Replicator instances with the same path (without address) that are running on other nodes . For convenience it can be used with the DistributedDataDistributedData extension but it can also be started as an ordinary actor using the Replicator.propsReplicator.props. If it is started as an ordinary actor it is important that it is given the same name, started on same path, on all nodes.

Cluster members with status WeaklyUp, will participate in Distributed Data. This means that the data will be replicated to the WeaklyUp nodes with the background gossip protocol. Note that it will not participate in any actions where the consistency mode is to read/write from all nodes or the majority of nodes. The WeaklyUp node is not counted as part of the cluster. So 3 nodes + 5 WeaklyUp is essentially a 3 node cluster as far as consistent actions are concerned.

Below is an example of an actor that schedules tick messages to itself and for each tick adds or removes elements from a ORSetORSet (observed-remove set). It also subscribes to changes of this.

Scala
sourceimport java.util.concurrent.ThreadLocalRandom
import org.apache.pekko
import pekko.actor.Actor
import pekko.actor.ActorLogging
import pekko.cluster.Cluster
import pekko.cluster.ddata.DistributedData
import pekko.cluster.ddata.ORSet
import pekko.cluster.ddata.ORSetKey
import pekko.cluster.ddata.Replicator
import pekko.cluster.ddata.Replicator._

object DataBot {
  private case object Tick
}

class DataBot extends Actor with ActorLogging {
  import DataBot._

  val replicator = DistributedData(context.system).replicator
  implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress

  import context.dispatcher
  val tickTask = context.system.scheduler.scheduleWithFixedDelay(5.seconds, 5.seconds, self, Tick)

  val DataKey = ORSetKey[String]("key")

  replicator ! Subscribe(DataKey, self)

  def receive = {
    case Tick =>
      val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
      if (ThreadLocalRandom.current().nextBoolean()) {
        // add
        log.info("Adding: {}", s)
        replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ :+ s)
      } else {
        // remove
        log.info("Removing: {}", s)
        replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_.remove(s))
      }

    case _: UpdateResponse[_] => // ignore
    case c @ Changed(DataKey) =>
      val data = c.get(DataKey)
      log.info("Current elements: {}", data.elements)
  }

  override def postStop(): Unit = tickTask.cancel()

}
Java
sourceimport java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.ddata.DistributedData;
import org.apache.pekko.cluster.ddata.Key;
import org.apache.pekko.cluster.ddata.ORSet;
import org.apache.pekko.cluster.ddata.ORSetKey;
import org.apache.pekko.cluster.ddata.Replicator;
import org.apache.pekko.cluster.ddata.Replicator.Changed;
import org.apache.pekko.cluster.ddata.Replicator.Subscribe;
import org.apache.pekko.cluster.ddata.Replicator.Update;
import org.apache.pekko.cluster.ddata.Replicator.UpdateResponse;
import org.apache.pekko.cluster.ddata.SelfUniqueAddress;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;

public class DataBot extends AbstractActor {

  private static final String TICK = "tick";

  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  private final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  private final SelfUniqueAddress node =
      DistributedData.get(getContext().getSystem()).selfUniqueAddress();

  private final Cancellable tickTask =
      getContext()
          .getSystem()
          .scheduler()
          .scheduleWithFixedDelay(
              Duration.ofSeconds(5),
              Duration.ofSeconds(5),
              getSelf(),
              TICK,
              getContext().getDispatcher(),
              getSelf());

  private final Key<ORSet<String>> dataKey = ORSetKey.create("key");

  @SuppressWarnings("unchecked")
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(String.class, a -> a.equals(TICK), a -> receiveTick())
        .match(
            Changed.class,
            c -> c.key().equals(dataKey),
            c -> receiveChanged((Changed<ORSet<String>>) c))
        .match(UpdateResponse.class, r -> receiveUpdateResponse())
        .build();
  }

  private void receiveTick() {
    String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123));
    if (ThreadLocalRandom.current().nextBoolean()) {
      // add
      log.info("Adding: {}", s);
      Update<ORSet<String>> update =
          new Update<>(dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.add(node, s));
      replicator.tell(update, getSelf());
    } else {
      // remove
      log.info("Removing: {}", s);
      Update<ORSet<String>> update =
          new Update<>(
              dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.remove(node, s));
      replicator.tell(update, getSelf());
    }
  }

  private void receiveChanged(Changed<ORSet<String>> c) {
    ORSet<String> data = c.dataValue();
    log.info("Current elements: {}", data.getElements());
  }

  private void receiveUpdateResponse() {
    // ignore
  }

  @Override
  public void preStart() {
    Subscribe<ORSet<String>> subscribe = new Subscribe<>(dataKey, getSelf());
    replicator.tell(subscribe, ActorRef.noSender());
  }

  @Override
  public void postStop() {
    tickTask.cancel();
  }
}

Update

For the full documentation of this feature and for new projects see Distributed Data - Update.

To modify and replicate a data value you send a Replicator.UpdateReplicator.Update message to the local ReplicatorReplicator.

The current data value for the key of the Update is passed as parameter to the modifymodify() function of the Update. The function is supposed to return the new value of the data, which will then be replicated according to the given consistency level.

The modify function is called by the Replicator actor and must therefore be a pure function that only uses the data parameter and stable fields from enclosing scope. It must for example not access the sender (sender()getSender()) reference of an enclosing actor.

Update is intended to only be sent from an actor running in same local ActorSystemActorSystem as the Replicator, because the modify function is typically not serializable.

Scala
sourceimplicit val node: SelfUniqueAddress = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator

val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")

replicator ! Update(Counter1Key, PNCounter(), WriteLocal)(_ :+ 1)

val writeTo3 = WriteTo(n = 3, timeout = 1.second)
replicator ! Update(Set1Key, GSet.empty[String], writeTo3)(_ + "hello")

val writeMajority = WriteMajority(timeout = 5.seconds)
replicator ! Update(Set2Key, ORSet.empty[String], writeMajority)(_ :+ "hello")

val writeAll = WriteAll(timeout = 5.seconds)
replicator ! Update(ActiveFlagKey, Flag.Disabled, writeAll)(_.switchOn)
Java
sourceclass DemonstrateUpdate extends AbstractActor {
  final SelfUniqueAddress node =
      DistributedData.get(getContext().getSystem()).selfUniqueAddress();
  final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();

  final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  final Key<GSet<String>> set1Key = GSetKey.create("set1");
  final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  final Key<Flag> activeFlagKey = FlagKey.create("active");

  @Override
  public Receive createReceive() {
    ReceiveBuilder b = receiveBuilder();

    b.matchEquals(
        "demonstrate update",
        msg -> {
          replicator.tell(
              new Replicator.Update<PNCounter>(
                  counter1Key,
                  PNCounter.create(),
                  Replicator.writeLocal(),
                  curr -> curr.increment(node, 1)),
              getSelf());

          final WriteConsistency writeTo3 = new WriteTo(3, Duration.ofSeconds(1));
          replicator.tell(
              new Replicator.Update<GSet<String>>(
                  set1Key, GSet.create(), writeTo3, curr -> curr.add("hello")),
              getSelf());

          final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(5));
          replicator.tell(
              new Replicator.Update<ORSet<String>>(
                  set2Key, ORSet.create(), writeMajority, curr -> curr.add(node, "hello")),
              getSelf());

          final WriteConsistency writeAll = new WriteAll(Duration.ofSeconds(5));
          replicator.tell(
              new Replicator.Update<Flag>(
                  activeFlagKey, Flag.create(), writeAll, curr -> curr.switchOn()),
              getSelf());
        });
    return b.build();
  }
}

As reply of the Update a Replicator.UpdateSuccessReplicator.UpdateSuccess is sent to the sender of the Update if the value was successfully replicated according to the supplied write consistency level within the supplied timeout. Otherwise a Replicator.UpdateFailureReplicator.UpdateFailure subclass is sent back. Note that a Replicator.UpdateTimeoutReplicator.UpdateTimeout reply does not mean that the update completely failed or was rolled back. It may still have been replicated to some nodes, and will eventually be replicated to all nodes with the gossip protocol.

Scala
sourcecase UpdateSuccess(Counter1Key, req) => // ok
Java
sourceb.match(
    UpdateSuccess.class,
    a -> a.key().equals(counter1Key),
    a -> {
      // ok
    });
Scala
sourcecase UpdateSuccess(Set1Key, req) => // ok
case UpdateTimeout(Set1Key, req) =>
// write to 3 nodes failed within 1.second
Java
sourceb.match(
        UpdateSuccess.class,
        a -> a.key().equals(set1Key),
        a -> {
          // ok
        })
    .match(
        UpdateTimeout.class,
        a -> a.key().equals(set1Key),
        a -> {
          // write to 3 nodes failed within 1.second
        });

You will always see your own writes. For example if you send two Replicator.UpdateReplicator.Update messages changing the value of the same key, the modify function of the second message will see the change that was performed by the first Update message.

It is possible to abort the Update when inspecting the state parameter that is passed in to the modify function by throwing an exception. That happens before the update is performed and a Replicator.ModifyFailureReplicator.ModifyFailure is sent back as reply.

In the Update message you can pass an optional request context, which the ReplicatorReplicator does not care about, but is included in the reply messages. This is a convenient way to pass contextual information (e.g. original sender) without having to use ask or maintain local correlation data structures.

Scala
sourceimplicit val node = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator
val writeTwo = WriteTo(n = 2, timeout = 3.second)
val Counter1Key = PNCounterKey("counter1")

def receive: Receive = {
  case "increment" =>
    // incoming command to increase the counter
    val upd = Update(Counter1Key, PNCounter(), writeTwo, request = Some(sender()))(_ :+ 1)
    replicator ! upd

  case UpdateSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
    replyTo ! "ack"
  case UpdateTimeout(Counter1Key, Some(replyTo: ActorRef)) =>
    replyTo ! "nack"
}
Java
sourceclass DemonstrateUpdateWithRequestContext extends AbstractActor {
  final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();

  final WriteConsistency writeTwo = new WriteTo(2, Duration.ofSeconds(3));
  final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            String.class,
            a -> a.equals("increment"),
            a -> {
              // incoming command to increase the counter
              Optional<Object> reqContext = Optional.of(getSender());
              Replicator.Update<PNCounter> upd =
                  new Replicator.Update<PNCounter>(
                      counter1Key,
                      PNCounter.create(),
                      writeTwo,
                      reqContext,
                      curr -> curr.increment(node, 1));
              replicator.tell(upd, getSelf());
            })
        .match(
            UpdateSuccess.class,
            a -> a.key().equals(counter1Key),
            a -> {
              ActorRef replyTo = (ActorRef) a.getRequest().get();
              replyTo.tell("ack", getSelf());
            })
        .match(
            UpdateTimeout.class,
            a -> a.key().equals(counter1Key),
            a -> {
              ActorRef replyTo = (ActorRef) a.getRequest().get();
              replyTo.tell("nack", getSelf());
            })
        .build();
  }
}

Get

For the full documentation of this feature and for new projects see Distributed Data - Get.

To retrieve the current value of a data you send Replicator.GetReplicator.Get message to the Replicator. You supply a consistency level which has the following meaning:

Scala
sourceval replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")

replicator ! Get(Counter1Key, ReadLocal)

val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
replicator ! Get(Set1Key, readFrom3)

val readMajority = ReadMajority(timeout = 5.seconds)
replicator ! Get(Set2Key, readMajority)

val readAll = ReadAll(timeout = 5.seconds)
replicator ! Get(ActiveFlagKey, readAll)
Java
sourceclass DemonstrateGet extends AbstractActor {
  final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();

  final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  final Key<GSet<String>> set1Key = GSetKey.create("set1");
  final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  final Key<Flag> activeFlagKey = FlagKey.create("active");

  @Override
  public Receive createReceive() {
    ReceiveBuilder b = receiveBuilder();

    b.matchEquals(
        "demonstrate get",
        msg -> {
          replicator.tell(
              new Replicator.Get<PNCounter>(counter1Key, Replicator.readLocal()), getSelf());

          final ReadConsistency readFrom3 = new ReadFrom(3, Duration.ofSeconds(1));
          replicator.tell(new Replicator.Get<GSet<String>>(set1Key, readFrom3), getSelf());

          final ReadConsistency readMajority = new ReadMajority(Duration.ofSeconds(5));
          replicator.tell(new Replicator.Get<ORSet<String>>(set2Key, readMajority), getSelf());

          final ReadConsistency readAll = new ReadAll(Duration.ofSeconds(5));
          replicator.tell(new Replicator.Get<Flag>(activeFlagKey, readAll), getSelf());
        });
    return b.build();
  }
}

As reply of the Get a Replicator.GetSuccessReplicator.GetSuccess is sent to the sender of the Get if the value was successfully retrieved according to the supplied read consistency level within the supplied timeout. Otherwise a Replicator.GetFailureReplicator.GetFailure is sent. If the key does not exist the reply will be Replicator.NotFoundReplicator.NotFound.

Scala
sourcecase g @ GetSuccess(Counter1Key, req) =>
  val value = g.get(Counter1Key).value
case NotFound(Counter1Key, req) => // key counter1 does not exist
Java
sourceb.match(
        GetSuccess.class,
        a -> a.key().equals(counter1Key),
        a -> {
          GetSuccess<PNCounter> g = a;
          BigInteger value = g.dataValue().getValue();
        })
    .match(
        NotFound.class,
        a -> a.key().equals(counter1Key),
        a -> {
          // key counter1 does not exist
        });
Scala
sourcecase g @ GetSuccess(Set1Key, req) =>
  val elements = g.get(Set1Key).elements
case GetFailure(Set1Key, req) =>
// read from 3 nodes failed within 1.second
case NotFound(Set1Key, req) => // key set1 does not exist
Java
sourceb.match(
        GetSuccess.class,
        a -> a.key().equals(set1Key),
        a -> {
          GetSuccess<GSet<String>> g = a;
          Set<String> value = g.dataValue().getElements();
        })
    .match(
        GetFailure.class,
        a -> a.key().equals(set1Key),
        a -> {
          // read from 3 nodes failed within 1.second
        })
    .match(
        NotFound.class,
        a -> a.key().equals(set1Key),
        a -> {
          // key set1 does not exist
        });

In the Replicator.GetReplicator.Get message you can pass an optional request context in the same way as for the Replicator.UpdateReplicator.Update message, described above. For example the original sender can be passed and replied to after receiving and transforming Replicator.GetSuccessReplicator.GetSuccess.

Scala
sourceimplicit val node = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator
val readTwo = ReadFrom(n = 2, timeout = 3.second)
val Counter1Key = PNCounterKey("counter1")

def receive: Receive = {
  case "get-count" =>
    // incoming request to retrieve current value of the counter
    replicator ! Get(Counter1Key, readTwo, request = Some(sender()))

  case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
    val value = g.get(Counter1Key).value.longValue
    replyTo ! value
  case GetFailure(Counter1Key, Some(replyTo: ActorRef)) =>
    replyTo ! -1L
  case NotFound(Counter1Key, Some(replyTo: ActorRef)) =>
    replyTo ! 0L
}
Java
sourceclass DemonstrateGetWithRequestContext extends AbstractActor {
  final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();

  final ReadConsistency readTwo = new ReadFrom(2, Duration.ofSeconds(3));
  final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            String.class,
            a -> a.equals("get-count"),
            a -> {
              // incoming request to retrieve current value of the counter
              Optional<Object> reqContext = Optional.of(getSender());
              replicator.tell(new Replicator.Get<PNCounter>(counter1Key, readTwo), getSelf());
            })
        .match(
            GetSuccess.class,
            a -> a.key().equals(counter1Key),
            a -> {
              ActorRef replyTo = (ActorRef) a.getRequest().get();
              GetSuccess<PNCounter> g = a;
              long value = g.dataValue().getValue().longValue();
              replyTo.tell(value, getSelf());
            })
        .match(
            GetFailure.class,
            a -> a.key().equals(counter1Key),
            a -> {
              ActorRef replyTo = (ActorRef) a.getRequest().get();
              replyTo.tell(-1L, getSelf());
            })
        .match(
            NotFound.class,
            a -> a.key().equals(counter1Key),
            a -> {
              ActorRef replyTo = (ActorRef) a.getRequest().get();
              replyTo.tell(0L, getSelf());
            })
        .build();
  }
}

Subscribe

For the full documentation of this feature and for new projects see Distributed Data - Subscribe.

You may also register interest in change notifications by sending Replicator.SubscribeReplicator.Subscribe message to the Replicator. It will send Replicator.ChangedReplicator.Changed messages to the registered subscriber when the data for the subscribed key is updated. Subscribers will be notified periodically with the configured notify-subscribers-interval, and it is also possible to send an explicit Replicator.FlushChanges message to the Replicator to notify the subscribers immediately.

The subscriber is automatically removed if the subscriber is terminated. A subscriber can also be deregistered with the Replicator.UnsubscribeReplicator.Unsubscribe message.

Scala
sourceval replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
// subscribe to changes of the Counter1Key value
replicator ! Subscribe(Counter1Key, self)
var currentValue = BigInt(0)

def receive: Receive = {
  case c @ Changed(Counter1Key) =>
    currentValue = c.get(Counter1Key).value
  case "get-count" =>
    // incoming request to retrieve current value of the counter
    sender() ! currentValue
}
Java
sourceclass DemonstrateSubscribe extends AbstractActor {
  final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");

  BigInteger currentValue = BigInteger.valueOf(0);

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Changed.class,
            a -> a.key().equals(counter1Key),
            a -> {
              Changed<PNCounter> g = a;
              currentValue = g.dataValue().getValue();
            })
        .match(
            String.class,
            a -> a.equals("get-count"),
            a -> {
              // incoming request to retrieve current value of the counter
              getSender().tell(currentValue, getSender());
            })
        .build();
  }

  @Override
  public void preStart() {
    // subscribe to changes of the Counter1Key value
    replicator.tell(new Subscribe<PNCounter>(counter1Key, getSelf()), ActorRef.noSender());
  }
}

Consistency

For the full documentation of this feature and for new projects see Distributed Data Consistency.

Here is an example of using Replicator.WriteMajorityReplicator.WriteMajority and Replicator.ReadMajorityReplicator.ReadMajority:

Scala
sourceprivate val timeout = 3.seconds
private val readMajority = ReadMajority(timeout)
private val writeMajority = WriteMajority(timeout)
Java
sourceprivate final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(3));
private static final ReadConsistency readMajority = new ReadMajority(Duration.ofSeconds(3));
Scala
sourcedef receiveGetCart: Receive = {
  case GetCart =>
    replicator ! Get(DataKey, readMajority, Some(sender()))

  case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) =>
    val data = g.get(DataKey)
    val cart = Cart(data.entries.values.toSet)
    replyTo ! cart

  case NotFound(DataKey, Some(replyTo: ActorRef)) =>
    replyTo ! Cart(Set.empty)

  case GetFailure(DataKey, Some(replyTo: ActorRef)) =>
    // ReadMajority failure, try again with local read
    replicator ! Get(DataKey, ReadLocal, Some(replyTo))
}
Java
sourceprivate Receive matchGetCart() {
  return receiveBuilder()
      .matchEquals(GET_CART, s -> receiveGetCart())
      .match(
          GetSuccess.class,
          this::isResponseToGetCart,
          g -> receiveGetSuccess((GetSuccess<LWWMap<String, LineItem>>) g))
      .match(
          NotFound.class,
          this::isResponseToGetCart,
          n -> receiveNotFound((NotFound<LWWMap<String, LineItem>>) n))
      .match(
          GetFailure.class,
          this::isResponseToGetCart,
          f -> receiveGetFailure((GetFailure<LWWMap<String, LineItem>>) f))
      .build();
}

private void receiveGetCart() {
  Optional<Object> ctx = Optional.of(getSender());
  replicator.tell(
      new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx), getSelf());
}

private boolean isResponseToGetCart(GetResponse<?> response) {
  return response.key().equals(dataKey)
      && (response.getRequest().orElse(null) instanceof ActorRef);
}

private void receiveGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
  Set<LineItem> items = new HashSet<>(g.dataValue().getEntries().values());
  ActorRef replyTo = (ActorRef) g.getRequest().get();
  replyTo.tell(new Cart(items), getSelf());
}

private void receiveNotFound(NotFound<LWWMap<String, LineItem>> n) {
  ActorRef replyTo = (ActorRef) n.getRequest().get();
  replyTo.tell(new Cart(new HashSet<>()), getSelf());
}

private void receiveGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
  // ReadMajority failure, try again with local read
  Optional<Object> ctx = Optional.of(getSender());
  replicator.tell(
      new Replicator.Get<LWWMap<String, LineItem>>(dataKey, Replicator.readLocal(), ctx),
      getSelf());
}
Scala
sourcedef receiveAddItem: Receive = {
  case cmd @ AddItem(item) =>
    val update = Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, Some(cmd)) { cart =>
      updateCart(cart, item)
    }
    replicator ! update
}
Java
sourceprivate Receive matchAddItem() {
  return receiveBuilder().match(AddItem.class, this::receiveAddItem).build();
}

private void receiveAddItem(AddItem add) {
  Update<LWWMap<String, LineItem>> update =
      new Update<>(dataKey, LWWMap.create(), writeMajority, cart -> updateCart(cart, add.item));
  replicator.tell(update, getSelf());
}

In some rare cases, when performing an Replicator.UpdateReplicator.Update it is needed to first try to fetch latest data from other nodes. That can be done by first sending a Replicator.GetReplicator.Get with Replicator.ReadMajorityReplicator.ReadMajority and then continue with the Replicator.UpdateReplicator.Update when the Replicator.GetSuccessReplicator.GetSuccess, Replicator.GetFailureReplicator.GetFailure or Replicator.NotFoundReplicator.NotFound reply is received. This might be needed when you need to base a decision on latest information or when removing entries from an ORSetORSet or ORMapORMap. If an entry is added to an ORSet or ORMap from one node and removed from another node the entry will only be removed if the added entry is visible on the node where the removal is performed (hence the name observed-removed set).

The following example illustrates how to do that:

Scala
sourcedef receiveRemoveItem: Receive = {
  case cmd @ RemoveItem(productId) =>
    // Try to fetch latest from a majority of nodes first, since ORMap
    // remove must have seen the item to be able to remove it.
    replicator ! Get(DataKey, readMajority, Some(cmd))

  case GetSuccess(DataKey, Some(RemoveItem(productId))) =>
    replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
      _.remove(node, productId)
    }

  case GetFailure(DataKey, Some(RemoveItem(productId))) =>
    // ReadMajority failed, fall back to best effort local value
    replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
      _.remove(node, productId)
    }

  case NotFound(DataKey, Some(RemoveItem(productId))) =>
  // nothing to remove
}
Java
sourceprivate void receiveRemoveItem(RemoveItem rm) {
  // Try to fetch latest from a majority of nodes first, since ORMap
  // remove must have seen the item to be able to remove it.
  Optional<Object> ctx = Optional.of(rm);
  replicator.tell(
      new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx), getSelf());
}

private void receiveRemoveItemGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
  RemoveItem rm = (RemoveItem) g.getRequest().get();
  removeItem(rm.productId);
}

private void receiveRemoveItemGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
  // ReadMajority failed, fall back to best effort local value
  RemoveItem rm = (RemoveItem) f.getRequest().get();
  removeItem(rm.productId);
}

private void removeItem(String productId) {
  Update<LWWMap<String, LineItem>> update =
      new Update<>(dataKey, LWWMap.create(), writeMajority, cart -> cart.remove(node, productId));
  replicator.tell(update, getSelf());
}

private boolean isResponseToRemoveItem(GetResponse<?> response) {
  return response.key().equals(dataKey)
      && (response.getRequest().orElse(null) instanceof RemoveItem);
}
Warning

Caveat: Even if you use WriteMajority and ReadMajority there is small risk that you may read stale data if the cluster membership has changed between the Update and the Get. For example, in cluster of 5 nodes when you Update and that change is written to 3 nodes: n1, n2, n3. Then 2 more nodes are added and a Get request is reading from 4 nodes, which happens to be n4, n5, n6, n7, i.e. the value on n1, n2, n3 is not seen in the response of the Get request.

Delete

For the full documentation of this feature and for new projects see Distributed Data - Delete.

Scala
sourceval replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set2Key = ORSetKey[String]("set2")

replicator ! Delete(Counter1Key, WriteLocal)

val writeMajority = WriteMajority(timeout = 5.seconds)
replicator ! Delete(Set2Key, writeMajority)
Java
sourceclass DemonstrateDelete extends AbstractActor {
  final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();

  final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  final Key<ORSet<String>> set2Key = ORSetKey.create("set2");

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals(
            "demonstrate delete",
            msg -> {
              replicator.tell(
                  new Delete<PNCounter>(counter1Key, Replicator.writeLocal()), getSelf());

              final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(5));
              replicator.tell(new Delete<PNCounter>(counter1Key, writeMajority), getSelf());
            })
        .build();
  }
}
Warning

As deleted keys continue to be included in the stored data on each node as well as in gossip messages, a continuous series of updates and deletes of top-level entities will result in growing memory usage until an ActorSystem runs out of memory. To use Pekko Distributed Data where frequent adds and removes are required, you should use a fixed number of top-level data types that support both updates and removals, for example ORMapORMap or ORSetORSet.

Replicated data types

Pekko contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. For the full documentation of this feature and for new projects see Distributed Data Replicated data types.

Delta-CRDT

For the full documentation of this feature and for new projects see Distributed Data Delta CRDT.

Custom Data Type

You can implement your own data types. For the full documentation of this feature and for new projects see Distributed Data custom data type.

Durable Storage

For the full documentation of this feature and for new projects see Durable Storage.

Limitations

For the full documentation of this feature and for new projects see Limitations.

Learn More about CRDTs

Configuration

The DistributedDataDistributedData extension can be configured with the following properties:

source# Settings for the DistributedData extension
pekko.cluster.distributed-data {
  # Actor name of the Replicator actor, /system/ddataReplicator
  name = ddataReplicator

  # Replicas are running on members tagged with this role.
  # All members are used if undefined or empty.
  role = ""

  # How often the Replicator should send out gossip information
  gossip-interval = 2 s
  
  # How often the subscribers will be notified of changes, if any
  notify-subscribers-interval = 500 ms

  # Logging of data with payload size in bytes larger than
  # this value. Maximum detected size per key is logged once,
  # with an increase threshold of 10%.
  # It can be disabled by setting the property to off.
  log-data-size-exceeding = 10 KiB

  # Maximum number of entries to transfer in one round of gossip exchange when
  # synchronizing the replicas. Next chunk will be transferred in next round of gossip.
  # The actual number of data entries in each Gossip message is dynamically
  # adjusted to not exceed the maximum remote message size (maximum-frame-size).
  max-delta-elements = 500
  
  # The id of the dispatcher to use for Replicator actors.
  # If specified you need to define the settings of the actual dispatcher.
  use-dispatcher = "pekko.actor.internal-dispatcher"

  # How often the Replicator checks for pruning of data associated with
  # removed cluster nodes. If this is set to 'off' the pruning feature will
  # be completely disabled.
  pruning-interval = 120 s
  
  # How long time it takes to spread the data to all other replica nodes.
  # This is used when initiating and completing the pruning process of data associated
  # with removed cluster nodes. The time measurement is stopped when any replica is 
  # unreachable, but it's still recommended to configure this with certain margin.
  # It should be in the magnitude of minutes even though typical dissemination time
  # is shorter (grows logarithmic with number of nodes). There is no advantage of 
  # setting this too low. Setting it to large value will delay the pruning process.
  max-pruning-dissemination = 300 s
  
  # The markers of that pruning has been performed for a removed node are kept for this
  # time and thereafter removed. If and old data entry that was never pruned is somehow
  # injected and merged with existing data after this time the value will not be correct.
  # This would be possible (although unlikely) in the case of a long network partition.
  # It should be in the magnitude of hours. For durable data it is configured by 
  # 'pekko.cluster.distributed-data.durable.pruning-marker-time-to-live'.
 pruning-marker-time-to-live = 6 h
  
  # Serialized Write and Read messages are cached when they are sent to 
  # several nodes. If no further activity they are removed from the cache
  # after this duration.
  serializer-cache-time-to-live = 10s

  # Update and Get operations are sent to oldest nodes first.
  # This is useful together with Cluster Singleton, which is running on oldest nodes.
  prefer-oldest = off
  
  # Settings for delta-CRDT
  delta-crdt {
    # enable or disable delta-CRDT replication
    enabled = on
    
    # Some complex deltas grow in size for each update and above this
    # threshold such deltas are discarded and sent as full state instead.
    # This is number of elements or similar size hint, not size in bytes.
    max-delta-size = 50
  }
  
  durable {
    # List of keys that are durable. Prefix matching is supported by using * at the
    # end of a key.  
    keys = []
    
    # The markers of that pruning has been performed for a removed node are kept for this
    # time and thereafter removed. If and old data entry that was never pruned is
    # injected and merged with existing data after this time the value will not be correct.
    # This would be possible if replica with durable data didn't participate in the pruning
    # (e.g. it was shutdown) and later started after this time. A durable replica should not 
    # be stopped for longer time than this duration and if it is joining again after this
    # duration its data should first be manually removed (from the lmdb directory).
    # It should be in the magnitude of days. Note that there is a corresponding setting
    # for non-durable data: 'pekko.cluster.distributed-data.pruning-marker-time-to-live'.
    pruning-marker-time-to-live = 10 d
    
    # Fully qualified class name of the durable store actor. It must be a subclass
    # of pekko.actor.Actor and handle the protocol defined in 
    # org.apache.pekko.cluster.ddata.DurableStore. The class must have a constructor with
    # com.typesafe.config.Config parameter.
    store-actor-class = org.apache.pekko.cluster.ddata.LmdbDurableStore
    
    use-dispatcher = pekko.cluster.distributed-data.durable.pinned-store
    
    pinned-store {
      executor = thread-pool-executor
      type = PinnedDispatcher
    }
    
    # Config for the LmdbDurableStore
    lmdb {
      # Directory of LMDB file. There are two options:
      # 1. A relative or absolute path to a directory that ends with 'ddata'
      #    the full name of the directory will contain name of the ActorSystem
      #    and its remote port.
      # 2. Otherwise the path is used as is, as a relative or absolute path to
      #    a directory.
      #
      # When running in production you may want to configure this to a specific
      # path (alt 2), since the default directory contains the remote port of the
      # actor system to make the name unique. If using a dynamically assigned 
      # port (0) it will be different each time and the previously stored data 
      # will not be loaded.
      dir = "ddata"
      
      # Size in bytes of the memory mapped file.
      map-size = 100 MiB
      
      # Accumulate changes before storing improves performance with the
      # risk of losing the last writes if the JVM crashes.
      # The interval is by default set to 'off' to write each update immediately.
      # Enabling write behind by specifying a duration, e.g. 200ms, is especially 
      # efficient when performing many writes to the same key, because it is only 
      # the last value for each key that will be serialized and stored.  
      # write-behind-interval = 200 ms
      write-behind-interval = off
    }
  }
  
}