Snapshotting

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Pekko Persistence.

Snapshots

As you model your domain using event sourced actors, you may notice that some actors may be prone to accumulating extremely long event logs and experiencing long recovery times. Sometimes, the right approach may be to split out into a set of shorter lived actors. However, when this is not an option, you can use snapshots to reduce recovery times drastically.

Persistent actors can save snapshots of internal state every N events or when a given predicate of the state is fulfilled.

Scala
source
import org.apache.pekko.persistence.typed.scaladsl.Effect EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => state) // do something based on a particular state .snapshotWhen { case (state, BookingCompleted(_), sequenceNumber) => true case (state, event, sequenceNumber) => false } .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
Java
source@Override // override retentionCriteria in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
  return RetentionCriteria.snapshotEvery(100, 2);
}
Scala
sourceEventSourcedBehavior[Command, Event, State](
  persistenceId = PersistenceId.ofUniqueId("abc"),
  emptyState = State(),
  commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
  eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
  .snapshotWhen {
    case (state, BookingCompleted(_), sequenceNumber) => true
    case (state, event, sequenceNumber)               => false
  }
Java
source@Override // override shouldSnapshot in EventSourcedBehavior
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
  return event instanceof BookingCompleted;
}

When a snapshot is triggered, incoming commands are stashed until the snapshot has been saved. This means that the state can safely be mutable although the serialization and storage of the state is performed asynchronously. The state instance will not be updated by new events until after the snapshot has been saved.

During recovery, the persistent actor is using the latest saved snapshot to initialize the state. Thereafter the events after the snapshot are replayed using the event handler to recover the persistent actor to its current (i.e. latest) state.

If not specified, they default to SnapshotSelectionCriteria.LatestSnapshotSelectionCriteria.latest() which selects the latest (youngest) snapshot. It’s possible to override the selection of which snapshot to use for recovery like this:

Scala
sourceimport org.apache.pekko.persistence.typed.SnapshotSelectionCriteria

EventSourcedBehavior[Command, Event, State](
  persistenceId = PersistenceId.ofUniqueId("abc"),
  emptyState = State(),
  commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
  eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
  .withRecovery(Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none))
Java
source@Override
public Recovery recovery() {
  return Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none());
}

To disable snapshot-based recovery, applications can use SnapshotSelectionCriteria.NoneSnapshotSelectionCriteria.none(). A recovery where no saved snapshot matches the specified SnapshotSelectionCriteria will replay all journaled events. This can be useful if snapshot serialization format has changed in an incompatible way. It should typically not be used when events have been deleted.

In order to use snapshots, a default snapshot-store (pekko.persistence.snapshot-store.plugin) must be configured, or you can pick a snapshot store for for a specific EventSourcedBehavior by defining it with withSnapshotPluginId of the EventSourcedBehavioroverriding snapshotPluginId in the EventSourcedBehavior.

Because some use cases may not benefit from or need snapshots, it is perfectly valid not to not configure a snapshot store. However, Pekko will log a warning message when this situation is detected and then continue to operate until an actor tries to store a snapshot, at which point the operation will fail.

Snapshot failures

Saving snapshots can either succeed or fail – this information is reported back to the persistent actor via the SnapshotCompleted or SnapshotFailed signal. Snapshot failures are logged by default but do not cause the actor to stop or restart.

If there is a problem with recovering the state of the actor from the journal when the actor is started, RecoveryFailed signal is emitted (logging the error by default), and the actor will be stopped. Note that failure to load snapshot is also treated like this, but you can disable loading of snapshots if you for example know that serialization format has changed in an incompatible way.

Optional snapshots

By default, the persistent actor will unconditionally be stopped if the snapshot can’t be loaded in the recovery. It is possible to make snapshot loading optional. This can be useful when it is alright to ignore snapshot in case of for example deserialization errors. When snapshot loading fails it will instead recover by replaying all events.

Enable this feature by setting snapshot-is-optional = true in the snapshot store configuration.

Warning

Don’t set snapshot-is-optional = true if events have been deleted because that would result in wrong recovered state if snapshot load fails.

Snapshot deletion

To free up space, an event sourced actor can automatically delete older snapshots based on the given RetentionCriteria.

Scala
source
import org.apache.pekko.persistence.typed.scaladsl.Effect EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => state) // do something based on a particular state .snapshotWhen { case (state, BookingCompleted(_), sequenceNumber) => true case (state, event, sequenceNumber) => false } .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
Java
source@Override // override retentionCriteria in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
  return RetentionCriteria.snapshotEvery(100, 2);
}
@Override // override shouldSnapshot in EventSourcedBehavior
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
  return event instanceof BookingCompleted;
}

Snapshot deletion is triggered after saving a new snapshot.

The above example will save snapshots automatically every numberOfEvents = 100. Snapshots that have sequence number less than the sequence number of the saved snapshot minus keepNSnapshots * numberOfEvents (100 * 2) are automatically deleted.

In addition, it will also save a snapshot when the persisted event is BookingCompleted. Automatic snapshotting based on numberOfEvents can be used without specifying snapshotWhenshouldSnapshot. Snapshots triggered by the snapshotWhenshouldSnapshot predicate will not trigger deletion of old snapshots.

On async deletion, either a DeleteSnapshotsCompleted or DeleteSnapshotsFailed signal is emitted. You can react to signal outcomes by using with receiveSignal handler by overriding receiveSignal. By default, successful completion is logged by the system at log level debug, failures at log level warning.

Scala
source
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2)) .receiveSignal { // optionally respond to signals case (state, _: SnapshotFailed) => // react to failure case (state, _: DeleteSnapshotsFailed) => // react to failure }
Java
source@Override
public SignalHandler<State> signalHandler() {
  return newSignalHandlerBuilder()
      .onSignal(
          SnapshotFailed.class,
          (state, completed) -> {
            throw new RuntimeException("TODO: add some on-snapshot-failed side-effect here");
          })
      .onSignal(
          DeleteSnapshotsFailed.class,
          (state, completed) -> {
            throw new RuntimeException(
                "TODO: add some on-delete-snapshot-failed side-effect here");
          })
      .onSignal(
          DeleteEventsFailed.class,
          (state, completed) -> {
            throw new RuntimeException(
                "TODO: add some on-delete-snapshot-failed side-effect here");
          })
      .build();
}

Event deletion

Deleting events in Event Sourcing based applications is typically either not used at all, or used in conjunction with snapshotting. By deleting events you will lose the history of how the system changed before it reached current state, which is one of the main reasons for using Event Sourcing in the first place.

If snapshot-based retention is enabled, after a snapshot has been successfully stored, a delete of the events (journaled by a single event sourced actor) up until the sequence number of the data held by that snapshot can be issued.

To elect to use this, enable withDeleteEventsOnSnapshot of the RetentionCriteria which is disabled by default.

Scala
source
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2).withDeleteEventsOnSnapshot) .receiveSignal { // optionally respond to signals case (state, _: SnapshotFailed) => // react to failure case (state, _: DeleteSnapshotsFailed) => // react to failure case (state, _: DeleteEventsFailed) => // react to failure }
Java
source@Override // override retentionCriteria in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
  return RetentionCriteria.snapshotEvery(100, 2).withDeleteEventsOnSnapshot();
}

Event deletion is triggered after saving a new snapshot. Old events would be deleted prior to old snapshots being deleted.

On async deletion, either a DeleteEventsCompleted or DeleteEventsFailed signal is emitted. You can react to signal outcomes by using with receiveSignal handler by overriding receiveSignal. By default, successful completion is logged by the system at log level debug, failures at log level warning.

Message deletion does not affect the highest sequence number of the journal, even if all messages were deleted from it after a delete occurs.

Note

It is up to the journal implementation whether events are actually removed from storage.