Testing
Module info¶
To use Pekko Persistence TestKit, add the module to your project:
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-persistence-typed" % PekkoVersion,
"org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion % Test
)
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${scala.binary.version}</artifactId>
<version>1.1.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence-typed_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.3")
implementation "org.apache.pekko:pekko-persistence-typed_${versions.ScalaBinary}"
testImplementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}"
}
Project Info: Pekko Persistence Testkit | |
---|---|
Artifact | org.apache.pekko
pekko-persistence-testkit
1.1.3
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.persistence.testkit |
License | |
Home page | https://pekko.apache.org/ |
API documentation | |
Forums | |
Release notes | Release Notes |
Issues | Github issues |
Sources | https://github.com/apache/pekko |
Unit testing¶
Note! The EventSourcedBehaviorTestKit
is a new feature, api may have changes breaking source compatibility in future versions.
Unit testing of EventSourcedBehavior
can be done with the EventSourcedBehaviorTestKit
. It supports running one command at a time and you can assert that the synchronously returned result is as expected. The result contains the events emitted by the command and the new state after applying the events. It also has support for verifying the reply to a command.
You need to configure the ActorSystem
with the EventSourcedBehaviorTestKit.config
. The configuration enables the in-memory journal and snapshot storage.
sourceclass AccountExampleDocSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
source@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(EventSourcedBehaviorTestKit.config());
private EventSourcedBehaviorTestKit<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account>
eventSourcedTestKit =
EventSourcedBehaviorTestKit.create(
testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1")));
A full test for the AccountEntity
, which is shown in the Persistence Style Guide, may look like this:
sourceimport org.apache.pekko
import pekko.Done
import pekko.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import pekko.persistence.typed.PersistenceId
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.pattern.StatusReply
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AnyWordSpecLike
class AccountExampleDocSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
with AnyWordSpecLike
with BeforeAndAfterEach
with LogCapturing {
private val eventSourcedTestKit =
EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account](
system,
AccountEntity("1", PersistenceId("Account", "1")))
override protected def beforeEach(): Unit = {
super.beforeEach()
eventSourcedTestKit.clear()
}
"Account" must {
"be created with zero balance" in {
val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
result.reply shouldBe StatusReply.Ack
result.event shouldBe AccountEntity.AccountCreated
result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0
}
"handle Withdraw" in {
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
val result1 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))
result1.reply shouldBe StatusReply.Ack
result1.event shouldBe AccountEntity.Deposited(100)
result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100
val result2 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(10, _))
result2.reply shouldBe StatusReply.Ack
result2.event shouldBe AccountEntity.Withdrawn(10)
result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90
}
"reject Withdraw overdraft" in {
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))
val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(110, _))
result.reply.isError shouldBe true
result.hasNoEvents shouldBe true
}
"handle GetBalance" in {
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))
val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_))
result.reply.balance shouldBe 100
result.hasNoEvents shouldBe true
}
}
}
sourceimport java.math.BigDecimal;
import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit;
import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply;
import org.apache.pekko.persistence.typed.PersistenceId;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
public class AccountExampleDocTest
{
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(EventSourcedBehaviorTestKit.config());
private EventSourcedBehaviorTestKit<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account>
eventSourcedTestKit =
EventSourcedBehaviorTestKit.create(
testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1")));
@Rule public final LogCapturing logCapturing = new LogCapturing();
@Before
public void beforeEach() {
eventSourcedTestKit.clear();
}
@Test
public void createWithEmptyBalance() {
CommandResultWithReply<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
assertEquals(StatusReply.ack(), result.reply());
assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event());
assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance);
}
@Test
public void createWithUnHandle() {
CommandResultWithReply<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
assertFalse(result.hasNoReply());
}
@Test
public void handleWithdraw() {
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
CommandResultWithReply<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result1 =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
assertEquals(StatusReply.ack(), result1.reply());
assertEquals(
BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount);
assertEquals(
BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance);
CommandResultWithReply<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result2 =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo));
assertEquals(StatusReply.ack(), result2.reply());
assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount);
assertEquals(
BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance);
}
@Test
public void rejectWithdrawOverdraft() {
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
eventSourcedTestKit.runCommand(
(ActorRef<StatusReply<Done>> replyTo) ->
new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
CommandResultWithReply<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo));
assertTrue(result.reply().isError());
assertTrue(result.hasNoEvents());
}
@Test
public void handleGetBalance() {
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
eventSourcedTestKit.runCommand(
(ActorRef<StatusReply<Done>> replyTo) ->
new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.CurrentBalance>
result = eventSourcedTestKit.runCommand(AccountEntity.GetBalance::new);
assertEquals(BigDecimal.valueOf(100), result.reply().balance);
}
}
Serialization of commands, events and state are verified automatically. The serialization checks can be customized with the SerializationSettings
when creating the EventSourcedBehaviorTestKit
. By default, the serialization roundtrip is checked but the equality of the result of the serialization is not checked. equals
must be implemented (or using case class
) in the commands, events and state if verifyEquality
is enabled.
To test recovery the restart
method of the EventSourcedBehaviorTestKit
can be used. It will restart the behavior, which will then recover from stored snapshot and events from previous commands. It’s also possible to populate the storage with events or simulate failures by using the underlying PersistenceTestKit
.
Persistence TestKit¶
Note! The PersistenceTestKit
is a new feature, api may have changes breaking source compatibility in future versions.
Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions. To use the testkit you need to add the following dependency in your project:
val PekkoVersion = "1.1.3"
libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${scala.binary.version}</artifactId>
<version>1.1.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.3")
implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}"
}
There are two testkit classes which have similar api:
PersistenceTestKit
class is for eventsSnapshotTestKit
class is for snapshots
The testkit classes have two corresponding plugins which emulate the behavior of the storages:
PersistenceTestKitPlugin
class emulates a events storagePersistenceTestKitSnapshotPlugin
class emulates a snapshots storage
Note! The corresponding plugins must be configured in the actor system which is used to initialize the particular testkit class:
source
val yourConfiguration = ConfigFactory.defaultApplication()
val system =
ActorSystem(??? /*some behavior*/, "test-system", PersistenceTestKitPlugin.config.withFallback(yourConfiguration))
val testKit = PersistenceTestKit(system)
sourcepublic class PersistenceTestKitConfig {
Config conf =
PersistenceTestKitPlugin.getInstance()
.config()
.withFallback(ConfigFactory.defaultApplication());
ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf);
PersistenceTestKit testKit = PersistenceTestKit.create(system);
}
and
source
val yourConfiguration = ConfigFactory.defaultApplication()
val system = ActorSystem(
??? /*some behavior*/,
"test-system",
PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration))
val testKit = SnapshotTestKit(system)
sourcepublic class SnapshotTestKitConfig {
Config conf =
PersistenceTestKitSnapshotPlugin.getInstance()
.config()
.withFallback(ConfigFactory.defaultApplication());
ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf);
SnapshotTestKit testKit = SnapshotTestKit.create(system);
}
A typical scenario is to create a persistent actor, send commands to it and check that it persists events as it is expected:
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.scaladsl.PersistenceTestKit
class PersistenceTestKitSampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
with AnyWordSpecLike
with BeforeAndAfterEach {
val persistenceTestKit = PersistenceTestKit(system)
override def beforeEach(): Unit = {
persistenceTestKit.clearAll()
}
"Persistent actor" should {
"persist all events" in {
val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
val persistentActor = spawn(
EventSourcedBehavior[Cmd, Evt, State](
persistenceId,
emptyState = State.empty,
commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
eventHandler = (state, evt) => state.updated(evt)))
val cmd = Cmd("data")
persistentActor ! cmd
val expectedPersistedEvent = Evt(cmd.data)
persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent)
}
}
}
sourcepublic class PersistenceTestKitSampleTest extends AbstractJavaTest {
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(
PersistenceTestKitPlugin.getInstance()
.config()
.withFallback(ConfigFactory.defaultApplication()));
PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system());
@Before
public void beforeEach() {
persistenceTestKit.clearAll();
}
@Test
public void test() {
PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id");
ActorRef<YourPersistentBehavior.Cmd> ref =
testKit.spawn(YourPersistentBehavior.create(persistenceId));
YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data");
ref.tell(cmd);
YourPersistentBehavior.Evt expectedEventPersisted = new YourPersistentBehavior.Evt(cmd.data);
persistenceTestKit.expectNextPersisted(persistenceId.id(), expectedEventPersisted);
}
}
class YourPersistentBehavior
extends EventSourcedBehavior<
YourPersistentBehavior.Cmd, YourPersistentBehavior.Evt, YourPersistentBehavior.State> {
static final class Cmd implements CborSerializable {
public final String data;
@JsonCreator
public Cmd(String data) {
this.data = data;
}
}
static final class Evt implements CborSerializable {
public final String data;
@JsonCreator
public Evt(String data) {
this.data = data;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Evt evt = (Evt) o;
return data.equals(evt.data);
}
@Override
public int hashCode() {
return data.hashCode();
}
}
static final class State implements CborSerializable {}
static Behavior<Cmd> create(PersistenceId persistenceId) {
return Behaviors.setup(context -> new YourPersistentBehavior(persistenceId));
}
private YourPersistentBehavior(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public State emptyState() {
// some state
return new State();
}
@Override
public CommandHandler<Cmd, Evt, State> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(Cmd.class, command -> Effect().persist(new Evt(command.data)))
.build();
}
@Override
public EventHandler<State, Evt> eventHandler() {
// TODO handle events
return newEventHandlerBuilder().forAnyState().onEvent(Evt.class, (state, evt) -> state).build();
}
}
You can safely use persistence testkit in combination with main pekko testkit.
The main methods of the api allow to (see PersistenceTestKit
and SnapshotTestKit
for more details):
- check if the given event/snapshot object is the next persisted in the storage.
- read a sequence of persisted events/snapshots.
- check that no events/snapshots have been persisted in the storage.
- throw the default exception from the storage on attempt to persist, read or delete the following event/snapshot.
- clear the events/snapshots persisted in the storage.
- reject the events, but not snapshots (rejections are not supported for snapshots in the original api).
- set your own policy which emulates the work of the storage. Policy determines what to do when persistence needs to execute some operation on the storage (i.e. read, delete, etc.).
- get all the events/snapshots persisted in the storage
- put the events/snapshots in the storage to test recovery
Setting your own policy for the storage¶
You can implement and set your own policy for the storage to control its actions on particular operations, for example you can fail or reject events on your own conditions. Implement the ProcessingPolicy[EventStorage.JournalOperation]
trait for event storage or ProcessingPolicy[SnapshotStorage.SnapshotOperation]
trait for snapshot storage, and set it with withPolicy()
method.
sourceclass PersistenceTestKitSampleSpecWithPolicy
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
with AnyWordSpecLike
with BeforeAndAfterEach {
val persistenceTestKit = PersistenceTestKit(system)
override def beforeEach(): Unit = {
persistenceTestKit.clearAll()
persistenceTestKit.resetPolicy()
}
"Testkit policy" should {
"fail all operations with custom exception" in {
val policy = new EventStorage.JournalPolicies.PolicyType {
class CustomFailure extends RuntimeException
override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
processingUnit match {
case WriteEvents(_) => StorageFailure(new CustomFailure)
case _ => ProcessingSuccess
}
}
persistenceTestKit.withPolicy(policy)
val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
val persistentActor = spawn(
EventSourcedBehavior[Cmd, Evt, State](
persistenceId,
emptyState = State.empty,
commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
eventHandler = (state, evt) => state.updated(evt)))
persistentActor ! Cmd("data")
persistenceTestKit.expectNothingPersisted(persistenceId.id)
}
}
}
sourcepublic class PersistenceTestKitPolicySampleTest extends AbstractJavaTest {
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(
PersistenceTestKitPlugin.getInstance()
.config()
.withFallback(ConfigFactory.defaultApplication()));
PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system());
@Before
public void beforeEach() {
persistenceTestKit.clearAll();
persistenceTestKit.resetPolicy();
}
@Test
public void test() {
SampleEventStoragePolicy policy = new SampleEventStoragePolicy();
persistenceTestKit.withPolicy(policy);
PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id");
ActorRef<YourPersistentBehavior.Cmd> ref =
testKit.spawn(YourPersistentBehavior.create(persistenceId));
YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data");
ref.tell(cmd);
persistenceTestKit.expectNothingPersisted(persistenceId.id());
}
static class SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> {
@Override
public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) {
if (processingUnit instanceof WriteEvents) {
return StorageFailure.create();
} else {
return ProcessingSuccess.getInstance();
}
}
}
}
tryProcess()
method of the ProcessingPolicy
has two arguments: persistence id and the storage operation.
Event storage has the following operations:
ReadEvents
Read the events from the storage.WriteEvents
Write the events to the storage.DeleteEvents
Delete the events from the storage.ReadSeqNum
Read the highest sequence number for particular persistence id.
Snapshot storage has the following operations:
ReadSnapshot
Read the snapshot from the storage.WriteSnapshot
Write the snapshot to the storage.DeleteSnapshotsByCriteria
Delete snapshots in the storage by criteria.DeleteSnapshotByMeta
Delete particular snapshot from the storage by its metadata.
The tryProcess()
method must return one of the processing results:
ProcessingSuccess
Successful completion of the operation. All the events will be saved/read/deleted.StorageFailure
Emulates exception from the storage.Reject
Emulates rejection from the storage.
Note that snapshot storage does not have rejections. If you return Reject
in the tryProcess()
of the snapshot storage policy, it will have the same effect as the StorageFailure
.
Here is an example of the policy for an event storage:
sourceimport org.apache.pekko.persistence.testkit._
class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType {
// you can use internal state, it does not need to be thread safe
var count = 1
override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
if (count < 10) {
count += 1
// check the type of operation and react with success or with reject or with failure.
// if you return ProcessingSuccess the operation will be performed, otherwise not.
processingUnit match {
case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess
case WriteEvents(batch) if batch.size > 1 =>
ProcessingSuccess
case ReadSeqNum => StorageFailure()
case DeleteEvents(_) => Reject()
case _ => StorageFailure()
}
} else {
ProcessingSuccess
}
}
sourceclass SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> {
// you can use internal state, it does not need to be thread safe
int count = 1;
@Override
public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) {
// check the type of operation and react with success or with reject or with failure.
// if you return ProcessingSuccess the operation will be performed, otherwise not.
if (count < 10) {
count += 1;
if (processingUnit instanceof ReadEvents) {
ReadEvents read = (ReadEvents) processingUnit;
if (read.batch().nonEmpty()) {
ProcessingSuccess.getInstance();
} else {
return StorageFailure.create();
}
} else if (processingUnit instanceof WriteEvents) {
return ProcessingSuccess.getInstance();
} else if (processingUnit instanceof DeleteEvents) {
return ProcessingSuccess.getInstance();
} else if (processingUnit.equals(ReadSeqNum.getInstance())) {
return Reject.create();
}
// you can set your own exception
return StorageFailure.create(new RuntimeException("your exception"));
} else {
return ProcessingSuccess.getInstance();
}
}
}
Here is an example of the policy for a snapshot storage:
sourceclass SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {
// you can use internal state, it does not need to be thread safe
var count = 1
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult =
if (count < 10) {
count += 1
// check the type of operation and react with success or with reject or with failure.
// if you return ProcessingSuccess the operation will be performed, otherwise not.
processingUnit match {
case ReadSnapshot(_, payload) if payload.nonEmpty =>
ProcessingSuccess
case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 =>
ProcessingSuccess
case DeleteSnapshotsByCriteria(_) => StorageFailure()
case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 =>
ProcessingSuccess
case _ => StorageFailure()
}
} else {
ProcessingSuccess
}
}
sourceclass SnapshotStoragePolicy implements ProcessingPolicy<SnapshotOperation> {
// you can use internal state, it doesn't need to be thread safe
int count = 1;
@Override
public ProcessingResult tryProcess(String processId, SnapshotOperation processingUnit) {
// check the type of operation and react with success or with failure.
// if you return ProcessingSuccess the operation will be performed, otherwise not.
if (count < 10) {
count += 1;
if (processingUnit instanceof ReadSnapshot) {
ReadSnapshot read = (ReadSnapshot) processingUnit;
if (read.getSnapshot().isPresent()) {
ProcessingSuccess.getInstance();
} else {
return StorageFailure.create();
}
} else if (processingUnit instanceof WriteSnapshot) {
return ProcessingSuccess.getInstance();
} else if (processingUnit instanceof DeleteSnapshotsByCriteria) {
return ProcessingSuccess.getInstance();
} else if (processingUnit instanceof DeleteSnapshotByMeta) {
return ProcessingSuccess.getInstance();
}
// you can set your own exception
return StorageFailure.create(new RuntimeException("your exception"));
} else {
return ProcessingSuccess.getInstance();
}
}
}
Configuration of Persistence TestKit¶
There are several configuration properties for persistence testkit, please refer to the reference configuration
Integration testing¶
EventSourcedBehavior
actors can be tested with the ActorTestKit together with other actors. The in-memory journal and snapshot storage from the Persistence TestKit can be used also for integration style testing of a single ActorSystem
, for example when using Cluster Sharding with a single Cluster node.
For tests that involve more than one Cluster node you have to use another journal and snapshot store. While it’s possible to use the Persistence Plugin Proxy it’s often better and more realistic to use a real database.
Plugin initialization¶
Some Persistence plugins create tables automatically, but has the limitation that it can’t be done concurrently from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize the plugins at the same time. To coordinate initialization you can use the PersistenceInit
utility.
PersistenceInit
is part of pekko-persistence-testkit
and you need to add the dependency to your project:
val PekkoVersion = "1.1.3"
libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${scala.binary.version}</artifactId>
<version>1.1.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.3")
implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}"
}
sourceimport org.apache.pekko.persistence.testkit.scaladsl.PersistenceInit
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
val timeout = 5.seconds
val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout)
Await.result(done, timeout)
sourceimport org.apache.pekko.persistence.testkit.javadsl.PersistenceInit;
import org.apache.pekko.Done;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Duration timeout = Duration.ofSeconds(5);
CompletionStage<Done> done =
PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout);
done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS);