Testing
Module info
To use Pekko Persistence TestKit, add the module to your project:
- sbt
- val PekkoVersion = "1.2.1" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-persistence-typed" % PekkoVersion, "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion % Test )
- Maven
- <properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.2.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-typed_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> </dependencies>
- Gradle
- def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.2.1") implementation "org.apache.pekko:pekko-persistence-typed_${versions.ScalaBinary}" testImplementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}" }
| Project Info: Pekko Persistence Testkit | |
|---|---|
| Artifact | org.apache.pekko pekko-persistence-testkit 1.2.1 | 
| JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 | 
| Scala versions | 2.13.16, 2.12.20, 3.3.6 | 
| JPMS module name | pekko.persistence.testkit | 
| License | |
| Home page | https://pekko.apache.org/ | 
| API documentation | |
| Forums | |
| Release notes | Release Notes | 
| Issues | Github issues | 
| Sources | https://github.com/apache/pekko | 
Unit testing
Note! The EventSourcedBehaviorTestKit is a new feature, api may have changes breaking source compatibility in future versions.
Unit testing of EventSourcedBehavior can be done with the EventSourcedBehaviorTestKitEventSourcedBehaviorTestKit. It supports running one command at a time and you can assert that the synchronously returned result is as expected. The result contains the events emitted by the command and the new state after applying the events. It also has support for verifying the reply to a command.
You need to configure the ActorSystem with the EventSourcedBehaviorTestKit.config. The configuration enables the in-memory journal and snapshot storage.
- Scala
- 
  source class AccountExampleDocSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
- Java
- 
  source @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(EventSourcedBehaviorTestKit.config()); private EventSourcedBehaviorTestKit< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> eventSourcedTestKit = EventSourcedBehaviorTestKit.create( testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1")));
A full test for the AccountEntity, which is shown in the Persistence Style Guide, may look like this:
- Scala
- 
  source import org.apache.pekko import pekko.Done import pekko.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit import pekko.persistence.typed.PersistenceId import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.pattern.StatusReply import org.scalatest.BeforeAndAfterEach import org.scalatest.wordspec.AnyWordSpecLike class AccountExampleDocSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config) with AnyWordSpecLike with BeforeAndAfterEach with LogCapturing { private val eventSourcedTestKit = EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account]( system, AccountEntity("1", PersistenceId("Account", "1"))) override protected def beforeEach(): Unit = { super.beforeEach() eventSourcedTestKit.clear() } "Account" must { "be created with zero balance" in { val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) result.reply shouldBe StatusReply.Ack result.event shouldBe AccountEntity.AccountCreated result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0 } "handle Withdraw" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) val result1 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) result1.reply shouldBe StatusReply.Ack result1.event shouldBe AccountEntity.Deposited(100) result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100 val result2 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(10, _)) result2.reply shouldBe StatusReply.Ack result2.event shouldBe AccountEntity.Withdrawn(10) result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90 } "reject Withdraw overdraft" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(110, _)) result.reply.isError shouldBe true result.hasNoEvents shouldBe true } "handle GetBalance" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)) result.reply.balance shouldBe 100 result.hasNoEvents shouldBe true } } }
- Java
- 
  source import java.math.BigDecimal; import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit; import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply; import org.apache.pekko.persistence.typed.PersistenceId; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; public class AccountExampleDocTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(EventSourcedBehaviorTestKit.config()); private EventSourcedBehaviorTestKit< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> eventSourcedTestKit = EventSourcedBehaviorTestKit.create( testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1"))); @Rule public final LogCapturing logCapturing = new LogCapturing(); @Before public void beforeEach() { eventSourcedTestKit.clear(); } @Test public void createWithEmptyBalance() { CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); assertEquals(StatusReply.ack(), result.reply()); assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event()); assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance); } @Test public void createWithUnHandle() { CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); assertFalse(result.hasNoReply()); } @Test public void handleWithdraw() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result1 = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); assertEquals(StatusReply.ack(), result1.reply()); assertEquals( BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount); assertEquals( BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result2 = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo)); assertEquals(StatusReply.ack(), result2.reply()); assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount); assertEquals( BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance); } @Test public void rejectWithdrawOverdraft() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); eventSourcedTestKit.runCommand( (ActorRef<StatusReply<Done>> replyTo) -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo)); assertTrue(result.reply().isError()); assertTrue(result.hasNoEvents()); } @Test public void handleGetBalance() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); eventSourcedTestKit.runCommand( (ActorRef<StatusReply<Done>> replyTo) -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, AccountEntity.CurrentBalance> result = eventSourcedTestKit.runCommand(AccountEntity.GetBalance::new); assertEquals(BigDecimal.valueOf(100), result.reply().balance); } }
Serialization of commands, events and state are verified automatically. The serialization checks can be customized with the SerializationSettings when creating the EventSourcedBehaviorTestKit. By default, the serialization roundtrip is checked but the equality of the result of the serialization is not checked. equals must be implemented (or using case class) in the commands, events and state if verifyEquality is enabled.
To test recovery the restart method of the EventSourcedBehaviorTestKit can be used. It will restart the behavior, which will then recover from stored snapshot and events from previous commands. It’s also possible to populate the storage with events or simulate failures by using the underlying PersistenceTestKitPersistenceTestKit.
Persistence TestKit
Note! The PersistenceTestKit is a new feature, api may have changes breaking source compatibility in future versions.
Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions. To use the testkit you need to add the following dependency in your project:
- sbt
- val PekkoVersion = "1.2.1" libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
- Maven
- <properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.2.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
- def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.2.1") implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}" }
There are two testkit classes which have similar api:
- PersistenceTestKit- PersistenceTestKitclass is for events
- SnapshotTestKit- SnapshotTestKitclass is for snapshots
The testkit classes have two corresponding plugins which emulate the behavior of the storages:
- PersistenceTestKitPlugin- PersistenceTestKitPluginclass emulates a events storage
- PersistenceTestKitSnapshotPlugin- PersistenceTestKitSnapshotPluginclass emulates a snapshots storage
Note! The corresponding plugins must be configured in the actor system which is used to initialize the particular testkit class:
- Scala
- 
  source 
 val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem(??? /*some behavior*/, "test-system", PersistenceTestKitPlugin.config.withFallback(yourConfiguration)) val testKit = PersistenceTestKit(system)
- Java
- 
  source public class PersistenceTestKitConfig { Config conf = PersistenceTestKitPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication()); ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf); PersistenceTestKit testKit = PersistenceTestKit.create(system); }
and
- Scala
- 
  source 
 val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem( ??? /*some behavior*/, "test-system", PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration)) val testKit = SnapshotTestKit(system)
- Java
- 
  source public class SnapshotTestKitConfig { Config conf = PersistenceTestKitSnapshotPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication()); ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf); SnapshotTestKit testKit = SnapshotTestKit.create(system); }
A typical scenario is to create a persistent actor, send commands to it and check that it persists events as it is expected:
- Scala
- 
  source import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.persistence.testkit.PersistenceTestKitPlugin import pekko.persistence.testkit.scaladsl.PersistenceTestKit class PersistenceTestKitSampleSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() } "Persistent actor" should { "persist all events" in { val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) val cmd = Cmd("data") persistentActor ! cmd val expectedPersistedEvent = Evt(cmd.data) persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent) } } }
- Java
- 
  source public class PersistenceTestKitSampleTest extends AbstractJavaTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource( PersistenceTestKitPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication())); PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system()); @Before public void beforeEach() { persistenceTestKit.clearAll(); } @Test public void test() { PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id"); ActorRef<YourPersistentBehavior.Cmd> ref = testKit.spawn(YourPersistentBehavior.create(persistenceId)); YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data"); ref.tell(cmd); YourPersistentBehavior.Evt expectedEventPersisted = new YourPersistentBehavior.Evt(cmd.data); persistenceTestKit.expectNextPersisted(persistenceId.id(), expectedEventPersisted); } } class YourPersistentBehavior extends EventSourcedBehavior< YourPersistentBehavior.Cmd, YourPersistentBehavior.Evt, YourPersistentBehavior.State> { static final class Cmd implements CborSerializable { public final String data; @JsonCreator public Cmd(String data) { this.data = data; } } static final class Evt implements CborSerializable { public final String data; @JsonCreator public Evt(String data) { this.data = data; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Evt evt = (Evt) o; return data.equals(evt.data); } @Override public int hashCode() { return data.hashCode(); } } static final class State implements CborSerializable {} static Behavior<Cmd> create(PersistenceId persistenceId) { return Behaviors.setup(context -> new YourPersistentBehavior(persistenceId)); } private YourPersistentBehavior(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { // some state return new State(); } @Override public CommandHandler<Cmd, Evt, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand(Cmd.class, command -> Effect().persist(new Evt(command.data))) .build(); } @Override public EventHandler<State, Evt> eventHandler() { // TODO handle events return newEventHandlerBuilder().forAnyState().onEvent(Evt.class, (state, evt) -> state).build(); } }
You can safely use persistence testkit in combination with main pekko testkit.
The main methods of the api allow to (see PersistenceTestKitPersistenceTestKit and SnapshotTestKitSnapshotTestKit for more details):
- check if the given event/snapshot object is the next persisted in the storage.
- read a sequence of persisted events/snapshots.
- check that no events/snapshots have been persisted in the storage.
- throw the default exception from the storage on attempt to persist, read or delete the following event/snapshot.
- clear the events/snapshots persisted in the storage.
- reject the events, but not snapshots (rejections are not supported for snapshots in the original api).
- set your own policy which emulates the work of the storage. Policy determines what to do when persistence needs to execute some operation on the storage (i.e. read, delete, etc.).
- get all the events/snapshots persisted in the storage
- put the events/snapshots in the storage to test recovery
Setting your own policy for the storage
You can implement and set your own policy for the storage to control its actions on particular operations, for example you can fail or reject events on your own conditions. Implement the ProcessingPolicy[EventStorage.JournalOperation]ProcessingPolicy<EventStorage.JournalOperation> traitinterface for event storage or ProcessingPolicy[SnapshotStorage.SnapshotOperation]ProcessingPolicy<SnapshotStorage.SnapshotOperation> traitinterface for snapshot storage, and set it with withPolicy() method.
- Scala
- 
  source class PersistenceTestKitSampleSpecWithPolicy extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() persistenceTestKit.resetPolicy() } "Testkit policy" should { "fail all operations with custom exception" in { val policy = new EventStorage.JournalPolicies.PolicyType { class CustomFailure extends RuntimeException override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = processingUnit match { case WriteEvents(_) => StorageFailure(new CustomFailure) case _ => ProcessingSuccess } } persistenceTestKit.withPolicy(policy) val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) persistentActor ! Cmd("data") persistenceTestKit.expectNothingPersisted(persistenceId.id) } } }
- Java
- 
  source public class PersistenceTestKitPolicySampleTest extends AbstractJavaTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource( PersistenceTestKitPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication())); PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system()); @Before public void beforeEach() { persistenceTestKit.clearAll(); persistenceTestKit.resetPolicy(); } @Test public void test() { SampleEventStoragePolicy policy = new SampleEventStoragePolicy(); persistenceTestKit.withPolicy(policy); PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id"); ActorRef<YourPersistentBehavior.Cmd> ref = testKit.spawn(YourPersistentBehavior.create(persistenceId)); YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data"); ref.tell(cmd); persistenceTestKit.expectNothingPersisted(persistenceId.id()); } static class SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> { @Override public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) { if (processingUnit instanceof WriteEvents) { return StorageFailure.create(); } else { return ProcessingSuccess.getInstance(); } } } }
tryProcess() method of the ProcessingPolicyProcessingPolicy has two arguments: persistence id and the storage operation. 
Event storage has the following operations:
- ReadEvents- ReadEventsRead the events from the storage.
- WriteEvents- WriteEventsWrite the events to the storage.
- DeleteEvents- DeleteEventsDelete the events from the storage.
- ReadSeqNum- ReadSeqNumRead the highest sequence number for particular persistence id.
Snapshot storage has the following operations:
- ReadSnapshot- ReadSnapshotRead the snapshot from the storage.
- WriteSnapshot- WriteSnapshotWrite the snapshot to the storage.
- DeleteSnapshotsByCriteria- DeleteSnapshotsByCriteriaDelete snapshots in the storage by criteria.
- DeleteSnapshotByMeta- DeleteSnapshotByMetaDelete particular snapshot from the storage by its metadata.
The tryProcess() method must return one of the processing results:
- ProcessingSuccess- ProcessingSuccessSuccessful completion of the operation. All the events will be saved/read/deleted.
- StorageFailure- StorageFailureEmulates exception from the storage.
- Reject- RejectEmulates rejection from the storage.
Note that snapshot storage does not have rejections. If you return Reject in the tryProcess() of the snapshot storage policy, it will have the same effect as the StorageFailure.
Here is an example of the policy for an event storage:
- Scala
- 
  source import org.apache.pekko.persistence.testkit._ class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType { // you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = if (count < 10) { count += 1 // check the type of operation and react with success or with reject or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess case WriteEvents(batch) if batch.size > 1 => ProcessingSuccess case ReadSeqNum => StorageFailure() case DeleteEvents(_) => Reject() case _ => StorageFailure() } } else { ProcessingSuccess } }
- Java
- 
  source class SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> { // you can use internal state, it does not need to be thread safe int count = 1; @Override public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) { // check the type of operation and react with success or with reject or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. if (count < 10) { count += 1; if (processingUnit instanceof ReadEvents) { ReadEvents read = (ReadEvents) processingUnit; if (read.batch().nonEmpty()) { ProcessingSuccess.getInstance(); } else { return StorageFailure.create(); } } else if (processingUnit instanceof WriteEvents) { return ProcessingSuccess.getInstance(); } else if (processingUnit instanceof DeleteEvents) { return ProcessingSuccess.getInstance(); } else if (processingUnit.equals(ReadSeqNum.getInstance())) { return Reject.create(); } // you can set your own exception return StorageFailure.create(new RuntimeException("your exception")); } else { return ProcessingSuccess.getInstance(); } } }
Here is an example of the policy for a snapshot storage:
- Scala
- 
  source class SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType { // you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = if (count < 10) { count += 1 // check the type of operation and react with success or with reject or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadSnapshot(_, payload) if payload.nonEmpty => ProcessingSuccess case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 => ProcessingSuccess case DeleteSnapshotsByCriteria(_) => StorageFailure() case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 => ProcessingSuccess case _ => StorageFailure() } } else { ProcessingSuccess } }
- Java
- 
  source class SnapshotStoragePolicy implements ProcessingPolicy<SnapshotOperation> { // you can use internal state, it doesn't need to be thread safe int count = 1; @Override public ProcessingResult tryProcess(String processId, SnapshotOperation processingUnit) { // check the type of operation and react with success or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. if (count < 10) { count += 1; if (processingUnit instanceof ReadSnapshot) { ReadSnapshot read = (ReadSnapshot) processingUnit; if (read.getSnapshot().isPresent()) { ProcessingSuccess.getInstance(); } else { return StorageFailure.create(); } } else if (processingUnit instanceof WriteSnapshot) { return ProcessingSuccess.getInstance(); } else if (processingUnit instanceof DeleteSnapshotsByCriteria) { return ProcessingSuccess.getInstance(); } else if (processingUnit instanceof DeleteSnapshotByMeta) { return ProcessingSuccess.getInstance(); } // you can set your own exception return StorageFailure.create(new RuntimeException("your exception")); } else { return ProcessingSuccess.getInstance(); } } }
Configuration of Persistence TestKit
There are several configuration properties for persistence testkit, please refer to the reference configuration
Integration testing
EventSourcedBehavior actors can be tested with the ActorTestKit together with other actors. The in-memory journal and snapshot storage from the Persistence TestKit can be used also for integration style testing of a single ActorSystem, for example when using Cluster Sharding with a single Cluster node.
For tests that involve more than one Cluster node you have to use another journal and snapshot store. While it’s possible to use the Persistence Plugin Proxy it’s often better and more realistic to use a real database.
Plugin initialization
Some Persistence plugins create tables automatically, but has the limitation that it can’t be done concurrently from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize the plugins at the same time. To coordinate initialization you can use the PersistenceInit utility.
PersistenceInit is part of pekko-persistence-testkit and you need to add the dependency to your project:
- sbt
- val PekkoVersion = "1.2.1" libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
- Maven
- <properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.2.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
- def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.2.1") implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}" }
- Scala
- 
  source import org.apache.pekko.persistence.testkit.scaladsl.PersistenceInit import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ val timeout = 5.seconds val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout) Await.result(done, timeout)
- Java
- 
  source import org.apache.pekko.persistence.testkit.javadsl.PersistenceInit; import org.apache.pekko.Done; import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; Duration timeout = Duration.ofSeconds(5); CompletionStage<Done> done = PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout); done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS);