Testing
Module info
To use Pekko Persistence TestKit, add the module to your project:
- sbt
val PekkoVersion = "1.0.0" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-persistence-typed" % PekkoVersion, "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion % Test )
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-typed_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.0") implementation "org.apache.pekko:pekko-persistence-typed_${versions.ScalaBinary}" testImplementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}" }
Project Info: Pekko Persistence Testkit | |
---|---|
Artifact | org.apache.pekko
pekko-persistence-testkit
1.0.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.11, 2.12.18, 3.3.0 |
JPMS module name | pekko.persistence.testkit |
License | |
Home page | https://pekko.apache.org/ |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/incubator-pekko |
Unit testing
Note! The EventSourcedBehaviorTestKit
is a new feature, api may have changes breaking source compatibility in future versions.
Unit testing of EventSourcedBehavior
can be done with the EventSourcedBehaviorTestKit
EventSourcedBehaviorTestKit
. It supports running one command at a time and you can assert that the synchronously returned result is as expected. The result contains the events emitted by the command and the new state after applying the events. It also has support for verifying the reply to a command.
You need to configure the ActorSystem
with the EventSourcedBehaviorTestKit.config
. The configuration enables the in-memory journal and snapshot storage.
- Scala
-
source
class AccountExampleDocSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
- Java
-
source
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(EventSourcedBehaviorTestKit.config()); private EventSourcedBehaviorTestKit< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> eventSourcedTestKit = EventSourcedBehaviorTestKit.create( testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1")));
A full test for the AccountEntity
, which is shown in the Persistence Style Guide, may look like this:
- Scala
-
source
import org.apache.pekko import pekko.Done import pekko.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit import pekko.persistence.typed.PersistenceId import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.pattern.StatusReply import org.scalatest.BeforeAndAfterEach import org.scalatest.wordspec.AnyWordSpecLike class AccountExampleDocSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config) with AnyWordSpecLike with BeforeAndAfterEach with LogCapturing { private val eventSourcedTestKit = EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account]( system, AccountEntity("1", PersistenceId("Account", "1"))) override protected def beforeEach(): Unit = { super.beforeEach() eventSourcedTestKit.clear() } "Account" must { "be created with zero balance" in { val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) result.reply shouldBe StatusReply.Ack result.event shouldBe AccountEntity.AccountCreated result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0 } "handle Withdraw" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) val result1 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) result1.reply shouldBe StatusReply.Ack result1.event shouldBe AccountEntity.Deposited(100) result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100 val result2 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(10, _)) result2.reply shouldBe StatusReply.Ack result2.event shouldBe AccountEntity.Withdrawn(10) result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90 } "reject Withdraw overdraft" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(110, _)) result.reply.isError shouldBe true result.hasNoEvents shouldBe true } "handle GetBalance" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)) result.reply.balance shouldBe 100 result.hasNoEvents shouldBe true } } }
- Java
-
source
import java.math.BigDecimal; import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit; import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply; import org.apache.pekko.persistence.typed.PersistenceId; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; public class AccountExampleDocTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(EventSourcedBehaviorTestKit.config()); private EventSourcedBehaviorTestKit< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> eventSourcedTestKit = EventSourcedBehaviorTestKit.create( testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1"))); @Rule public final LogCapturing logCapturing = new LogCapturing(); @Before public void beforeEach() { eventSourcedTestKit.clear(); } @Test public void createWithEmptyBalance() { CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); assertEquals(StatusReply.ack(), result.reply()); assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event()); assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance); } @Test public void createWithUnHandle() { CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); assertFalse(result.hasNoReply()); } @Test public void handleWithdraw() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result1 = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); assertEquals(StatusReply.ack(), result1.reply()); assertEquals( BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount); assertEquals( BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result2 = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo)); assertEquals(StatusReply.ack(), result2.reply()); assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount); assertEquals( BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance); } @Test public void rejectWithdrawOverdraft() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); eventSourcedTestKit.runCommand( (ActorRef<StatusReply<Done>> replyTo) -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>> result = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo)); assertTrue(result.reply().isError()); assertTrue(result.hasNoEvents()); } @Test public void handleGetBalance() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); eventSourcedTestKit.runCommand( (ActorRef<StatusReply<Done>> replyTo) -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, AccountEntity.CurrentBalance> result = eventSourcedTestKit.runCommand(AccountEntity.GetBalance::new); assertEquals(BigDecimal.valueOf(100), result.reply().balance); } }
Serialization of commands, events and state are verified automatically. The serialization checks can be customized with the SerializationSettings
when creating the EventSourcedBehaviorTestKit
. By default, the serialization roundtrip is checked but the equality of the result of the serialization is not checked. equals
must be implemented (or using case class
) in the commands, events and state if verifyEquality
is enabled.
To test recovery the restart
method of the EventSourcedBehaviorTestKit
can be used. It will restart the behavior, which will then recover from stored snapshot and events from previous commands. It’s also possible to populate the storage with events or simulate failures by using the underlying PersistenceTestKit
PersistenceTestKit
.
Persistence TestKit
Note! The PersistenceTestKit
is a new feature, api may have changes breaking source compatibility in future versions.
Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions. To use the testkit you need to add the following dependency in your project:
- sbt
val PekkoVersion = "1.0.0" libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.0") implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}" }
There are two testkit classes which have similar api:
PersistenceTestKit
PersistenceTestKit
class is for eventsSnapshotTestKit
SnapshotTestKit
class is for snapshots
The testkit classes have two corresponding plugins which emulate the behavior of the storages:
PersistenceTestKitPlugin
PersistenceTestKitPlugin
class emulates a events storagePersistenceTestKitSnapshotPlugin
PersistenceTestKitSnapshotPlugin
class emulates a snapshots storage
Note! The corresponding plugins must be configured in the actor system which is used to initialize the particular testkit class:
- Scala
-
source
val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem(??? /*some behavior*/, "test-system", PersistenceTestKitPlugin.config.withFallback(yourConfiguration)) val testKit = PersistenceTestKit(system) - Java
-
source
public class PersistenceTestKitConfig { Config conf = PersistenceTestKitPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication()); ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf); PersistenceTestKit testKit = PersistenceTestKit.create(system); }
and
- Scala
-
source
val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem( ??? /*some behavior*/, "test-system", PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration)) val testKit = SnapshotTestKit(system) - Java
-
source
public class SnapshotTestKitConfig { Config conf = PersistenceTestKitSnapshotPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication()); ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf); SnapshotTestKit testKit = SnapshotTestKit.create(system); }
A typical scenario is to create a persistent actor, send commands to it and check that it persists events as it is expected:
- Scala
-
source
import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.persistence.testkit.PersistenceTestKitPlugin import pekko.persistence.testkit.scaladsl.PersistenceTestKit class PersistenceTestKitSampleSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() } "Persistent actor" should { "persist all events" in { val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) val cmd = Cmd("data") persistentActor ! cmd val expectedPersistedEvent = Evt(cmd.data) persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent) } } }
- Java
-
source
public class PersistenceTestKitSampleTest extends AbstractJavaTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource( PersistenceTestKitPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication())); PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system()); @Before public void beforeEach() { persistenceTestKit.clearAll(); } @Test public void test() { PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id"); ActorRef<YourPersistentBehavior.Cmd> ref = testKit.spawn(YourPersistentBehavior.create(persistenceId)); YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data"); ref.tell(cmd); YourPersistentBehavior.Evt expectedEventPersisted = new YourPersistentBehavior.Evt(cmd.data); persistenceTestKit.expectNextPersisted(persistenceId.id(), expectedEventPersisted); } } class YourPersistentBehavior extends EventSourcedBehavior< YourPersistentBehavior.Cmd, YourPersistentBehavior.Evt, YourPersistentBehavior.State> { static final class Cmd implements CborSerializable { public final String data; @JsonCreator public Cmd(String data) { this.data = data; } } static final class Evt implements CborSerializable { public final String data; @JsonCreator public Evt(String data) { this.data = data; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Evt evt = (Evt) o; return data.equals(evt.data); } @Override public int hashCode() { return data.hashCode(); } } static final class State implements CborSerializable {} static Behavior<Cmd> create(PersistenceId persistenceId) { return Behaviors.setup(context -> new YourPersistentBehavior(persistenceId)); } private YourPersistentBehavior(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { // some state return new State(); } @Override public CommandHandler<Cmd, Evt, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand(Cmd.class, command -> Effect().persist(new Evt(command.data))) .build(); } @Override public EventHandler<State, Evt> eventHandler() { // TODO handle events return newEventHandlerBuilder().forAnyState().onEvent(Evt.class, (state, evt) -> state).build(); } }
You can safely use persistence testkit in combination with main pekko testkit.
The main methods of the api allow to (see PersistenceTestKit
PersistenceTestKit
and SnapshotTestKit
SnapshotTestKit
for more details):
- check if the given event/snapshot object is the next persisted in the storage.
- read a sequence of persisted events/snapshots.
- check that no events/snapshots have been persisted in the storage.
- throw the default exception from the storage on attempt to persist, read or delete the following event/snapshot.
- clear the events/snapshots persisted in the storage.
- reject the events, but not snapshots (rejections are not supported for snapshots in the original api).
- set your own policy which emulates the work of the storage. Policy determines what to do when persistence needs to execute some operation on the storage (i.e. read, delete, etc.).
- get all the events/snapshots persisted in the storage
- put the events/snapshots in the storage to test recovery
Setting your own policy for the storage
You can implement and set your own policy for the storage to control its actions on particular operations, for example you can fail or reject events on your own conditions. Implement the ProcessingPolicy[EventStorage.JournalOperation]
ProcessingPolicy<EventStorage.JournalOperation>
traitinterface for event storage or ProcessingPolicy[SnapshotStorage.SnapshotOperation]
ProcessingPolicy<SnapshotStorage.SnapshotOperation>
traitinterface for snapshot storage, and set it with withPolicy()
method.
- Scala
-
source
class PersistenceTestKitSampleSpecWithPolicy extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() persistenceTestKit.resetPolicy() } "Testkit policy" should { "fail all operations with custom exception" in { val policy = new EventStorage.JournalPolicies.PolicyType { class CustomFailure extends RuntimeException override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = processingUnit match { case WriteEvents(_) => StorageFailure(new CustomFailure) case _ => ProcessingSuccess } } persistenceTestKit.withPolicy(policy) val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) persistentActor ! Cmd("data") persistenceTestKit.expectNothingPersisted(persistenceId.id) } } }
- Java
-
source
public class PersistenceTestKitPolicySampleTest extends AbstractJavaTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource( PersistenceTestKitPlugin.getInstance() .config() .withFallback(ConfigFactory.defaultApplication())); PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system()); @Before public void beforeEach() { persistenceTestKit.clearAll(); persistenceTestKit.resetPolicy(); } @Test public void test() { SampleEventStoragePolicy policy = new SampleEventStoragePolicy(); persistenceTestKit.withPolicy(policy); PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id"); ActorRef<YourPersistentBehavior.Cmd> ref = testKit.spawn(YourPersistentBehavior.create(persistenceId)); YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data"); ref.tell(cmd); persistenceTestKit.expectNothingPersisted(persistenceId.id()); } static class SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> { @Override public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) { if (processingUnit instanceof WriteEvents) { return StorageFailure.create(); } else { return ProcessingSuccess.getInstance(); } } } }
tryProcess()
method of the ProcessingPolicy
ProcessingPolicy
has two arguments: persistence id and the storage operation.
Event storage has the following operations:
ReadEvents
ReadEvents
Read the events from the storage.WriteEvents
WriteEvents
Write the events to the storage.DeleteEvents
DeleteEvents
Delete the events from the storage.ReadSeqNum
ReadSeqNum
Read the highest sequence number for particular persistence id.
Snapshot storage has the following operations:
ReadSnapshot
ReadSnapshot
Read the snapshot from the storage.WriteSnapshot
WriteSnapshot
Write the snapshot to the storage.DeleteSnapshotsByCriteria
DeleteSnapshotsByCriteria
Delete snapshots in the storage by criteria.DeleteSnapshotByMeta
DeleteSnapshotByMeta
Delete particular snapshot from the storage by its metadata.
The tryProcess()
method must return one of the processing results:
ProcessingSuccess
ProcessingSuccess
Successful completion of the operation. All the events will be saved/read/deleted.StorageFailure
StorageFailure
Emulates exception from the storage.Reject
Reject
Emulates rejection from the storage.
Note that snapshot storage does not have rejections. If you return Reject
in the tryProcess()
of the snapshot storage policy, it will have the same effect as the StorageFailure
.
Here is an example of the policy for an event storage:
- Scala
-
source
import org.apache.pekko.persistence.testkit._ class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType { // you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = if (count < 10) { count += 1 // check the type of operation and react with success or with reject or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess case WriteEvents(batch) if batch.size > 1 => ProcessingSuccess case ReadSeqNum => StorageFailure() case DeleteEvents(_) => Reject() case _ => StorageFailure() } } else { ProcessingSuccess } }
- Java
-
source
class SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> { // you can use internal state, it does not need to be thread safe int count = 1; @Override public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) { // check the type of operation and react with success or with reject or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. if (count < 10) { count += 1; if (processingUnit instanceof ReadEvents) { ReadEvents read = (ReadEvents) processingUnit; if (read.batch().nonEmpty()) { ProcessingSuccess.getInstance(); } else { return StorageFailure.create(); } } else if (processingUnit instanceof WriteEvents) { return ProcessingSuccess.getInstance(); } else if (processingUnit instanceof DeleteEvents) { return ProcessingSuccess.getInstance(); } else if (processingUnit.equals(ReadSeqNum.getInstance())) { return Reject.create(); } // you can set your own exception return StorageFailure.create(new RuntimeException("your exception")); } else { return ProcessingSuccess.getInstance(); } } }
Here is an example of the policy for a snapshot storage:
- Scala
-
source
class SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType { // you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = if (count < 10) { count += 1 // check the type of operation and react with success or with reject or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadSnapshot(_, payload) if payload.nonEmpty => ProcessingSuccess case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 => ProcessingSuccess case DeleteSnapshotsByCriteria(_) => StorageFailure() case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 => ProcessingSuccess case _ => StorageFailure() } } else { ProcessingSuccess } }
- Java
-
source
class SnapshotStoragePolicy implements ProcessingPolicy<SnapshotOperation> { // you can use internal state, it doesn't need to be thread safe int count = 1; @Override public ProcessingResult tryProcess(String processId, SnapshotOperation processingUnit) { // check the type of operation and react with success or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. if (count < 10) { count += 1; if (processingUnit instanceof ReadSnapshot) { ReadSnapshot read = (ReadSnapshot) processingUnit; if (read.getSnapshot().isPresent()) { ProcessingSuccess.getInstance(); } else { return StorageFailure.create(); } } else if (processingUnit instanceof WriteSnapshot) { return ProcessingSuccess.getInstance(); } else if (processingUnit instanceof DeleteSnapshotsByCriteria) { return ProcessingSuccess.getInstance(); } else if (processingUnit instanceof DeleteSnapshotByMeta) { return ProcessingSuccess.getInstance(); } // you can set your own exception return StorageFailure.create(new RuntimeException("your exception")); } else { return ProcessingSuccess.getInstance(); } } }
Configuration of Persistence TestKit
There are several configuration properties for persistence testkit, please refer to the reference configuration
Integration testing
EventSourcedBehavior
actors can be tested with the ActorTestKit together with other actors. The in-memory journal and snapshot storage from the Persistence TestKit can be used also for integration style testing of a single ActorSystem
, for example when using Cluster Sharding with a single Cluster node.
For tests that involve more than one Cluster node you have to use another journal and snapshot store. While it’s possible to use the Persistence Plugin Proxy it’s often better and more realistic to use a real database.
Plugin initialization
Some Persistence plugins create tables automatically, but has the limitation that it can’t be done concurrently from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize the plugins at the same time. To coordinate initialization you can use the PersistenceInit
utility.
PersistenceInit
is part of pekko-persistence-testkit
and you need to add the dependency to your project:
- sbt
val PekkoVersion = "1.0.0" libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.0") implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}" }
- Scala
-
source
import org.apache.pekko.persistence.testkit.scaladsl.PersistenceInit import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ val timeout = 5.seconds val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout) Await.result(done, timeout)
- Java
-
source
import org.apache.pekko.persistence.testkit.javadsl.PersistenceInit; import org.apache.pekko.Done; import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; Duration timeout = Duration.ofSeconds(5); CompletionStage<Done> done = PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout); done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS);