Management of a Projection

Offset management

With the ProjectionManagementProjectionManagement API you can manage the offset of a projection.

To retrieve latest stored offset:

Scala
sourceimport org.apache.pekko
import pekko.projection.scaladsl.ProjectionManagement
import pekko.persistence.query.Offset
import pekko.projection.ProjectionId

val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Offset]] = ProjectionManagement(system).getOffset[Offset](projectionId)
Java
sourceimport org.apache.pekko.projection.javadsl.ProjectionManagement;


ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Optional<Offset>> currentOffset =
    ProjectionManagement.get(system).getOffset(projectionId);

The offset can be cleared if the projection should be completely rebuilt, starting over again from the first offset. The operation will automatically restart the projection.

Scala
sourceval projectionId = ProjectionId("shopping-carts", "carts-1")
val done: Future[Done] = ProjectionManagement(system).clearOffset(projectionId)
Java
source
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Done> done = ProjectionManagement.get(system).clearOffset(projectionId);

The offset can also be updated, which can be useful if the projection is stuck with errors on a specific offset and should skip that offset and continue with next. The operation will automatically restart the projection.

Scala
sourceimport org.apache.pekko.persistence.query.Sequence
val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Sequence]] = ProjectionManagement(system).getOffset[Sequence](projectionId)
currentOffset.foreach {
  case Some(s) => ProjectionManagement(system).updateOffset[Sequence](projectionId, Sequence(s.value + 1))
  case None    => // already removed
}
Java
sourceimport org.apache.pekko.persistence.query.Sequence;


ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Optional<Sequence>> currentOffset =
    ProjectionManagement.get(system).getOffset(projectionId);
currentOffset.thenAccept(
    optionalOffset -> {
      if (optionalOffset.isPresent()) {
        Sequence newOffset = new Sequence(optionalOffset.get().value());
        CompletionStage<Done> done =
            ProjectionManagement.get(system).updateOffset(projectionId, newOffset);
      }
    });

Pause and resume

With the ProjectionManagementProjectionManagement API you can pause and resume processing of a projection. For example, this can be useful when performing some data migration and projection processing cannot run while the migration is in progress.

Scala
sourceimport org.apache.pekko
import pekko.projection.scaladsl.ProjectionManagement
import pekko.projection.ProjectionId

val projectionId = ProjectionId("shopping-carts", "carts-1")
val mgmt = ProjectionManagement(system)
val done = for {
  _ <- mgmt.pause(projectionId)
  _ <- someDataMigration()
  _ <- mgmt.resume(projectionId)
} yield Done
Java
source
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); ProjectionManagement mgmt = ProjectionManagement.get(system); CompletionStage<Done> pauseDone = mgmt.pause(projectionId); CompletionStage<Done> migrationDone = pauseDone.thenCompose(notUsed -> someDataMigration()); CompletionStage<Done> resumeDone = migrationDone.thenCompose(notUsed -> mgmt.resume(projectionId));

The paused/resumed state is stored and, and it is read when the Projections are started, for example in case of rebalance or system restart.

To retrieve the paused state:

Scala
sourceval projectionId = ProjectionId("shopping-carts", "carts-1")
val paused: Future[Boolean] = ProjectionManagement(system).isPaused(projectionId)
Java
source
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Boolean> paused = ProjectionManagement.get(system).isPaused(projectionId);

Status tracking

The status of a Projection can be tracked by implementing a StatusObserverStatusObserver and enable it with withStatusObserver before running the Projection.

The StatusObserver is called when errors occur and envelopes are retried or the projection failed (restarted). It also has callbacks for processing progress and projection lifecyle.

The intention is that the implementation of the StatusObserver would maintain a view that can be accessed from an administrative UI to have an overview of current status of the projections.