Management of a Projection
Offset management
With the ProjectionManagement
ProjectionManagement
API you can manage the offset of a projection.
To retrieve latest stored offset:
- Scala
-
source
import org.apache.pekko import pekko.projection.scaladsl.ProjectionManagement import pekko.persistence.query.Offset import pekko.projection.ProjectionId val projectionId = ProjectionId("shopping-carts", "carts-1") val currentOffset: Future[Option[Offset]] = ProjectionManagement(system).getOffset[Offset](projectionId)
- Java
-
source
import org.apache.pekko.projection.javadsl.ProjectionManagement; ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Optional<Offset>> currentOffset = ProjectionManagement.get(system).getOffset(projectionId);
The offset can be cleared if the projection should be completely rebuilt, starting over again from the first offset. The operation will automatically restart the projection.
- Scala
-
source
val projectionId = ProjectionId("shopping-carts", "carts-1") val done: Future[Done] = ProjectionManagement(system).clearOffset(projectionId)
- Java
-
source
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Done> done = ProjectionManagement.get(system).clearOffset(projectionId);
The offset can also be updated, which can be useful if the projection is stuck with errors on a specific offset and should skip that offset and continue with next. The operation will automatically restart the projection.
- Scala
-
source
import org.apache.pekko.persistence.query.Sequence val projectionId = ProjectionId("shopping-carts", "carts-1") val currentOffset: Future[Option[Sequence]] = ProjectionManagement(system).getOffset[Sequence](projectionId) currentOffset.foreach { case Some(s) => ProjectionManagement(system).updateOffset[Sequence](projectionId, Sequence(s.value + 1)) case None => // already removed }
- Java
-
source
import org.apache.pekko.persistence.query.Sequence; ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Optional<Sequence>> currentOffset = ProjectionManagement.get(system).getOffset(projectionId); currentOffset.thenAccept( optionalOffset -> { if (optionalOffset.isPresent()) { Sequence newOffset = new Sequence(optionalOffset.get().value()); CompletionStage<Done> done = ProjectionManagement.get(system).updateOffset(projectionId, newOffset); } });
Pause and resume
With the ProjectionManagement
ProjectionManagement
API you can pause and resume processing of a projection. For example, this can be useful when performing some data migration and projection processing cannot run while the migration is in progress.
- Scala
-
source
import org.apache.pekko import pekko.projection.scaladsl.ProjectionManagement import pekko.projection.ProjectionId val projectionId = ProjectionId("shopping-carts", "carts-1") val mgmt = ProjectionManagement(system) val done = for { _ <- mgmt.pause(projectionId) _ <- someDataMigration() _ <- mgmt.resume(projectionId) } yield Done
- Java
-
source
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); ProjectionManagement mgmt = ProjectionManagement.get(system); CompletionStage<Done> pauseDone = mgmt.pause(projectionId); CompletionStage<Done> migrationDone = pauseDone.thenCompose(notUsed -> someDataMigration()); CompletionStage<Done> resumeDone = migrationDone.thenCompose(notUsed -> mgmt.resume(projectionId));
The paused/resumed state is stored and, and it is read when the Projections are started, for example in case of rebalance or system restart.
To retrieve the paused state:
- Scala
-
source
val projectionId = ProjectionId("shopping-carts", "carts-1") val paused: Future[Boolean] = ProjectionManagement(system).isPaused(projectionId)
- Java
-
source
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Boolean> paused = ProjectionManagement.get(system).isPaused(projectionId);
Status tracking
The status of a Projection
can be tracked by implementing a StatusObserver
StatusObserver
and enable it with withStatusObserver
before running the Projection
.
The StatusObserver
is called when errors occur and envelopes are retried or the projection failed (restarted). It also has callbacks for processing progress and projection lifecyle.
The intention is that the implementation of the StatusObserver
would maintain a view that can be accessed from an administrative UI to have an overview of current status of the projections.