Writing tests for a Projection
Like other Pekko libraries, Projections ships with a TestKit that a user can include to assert the correctness of their Projection handler implementation. Add the Projections TestKit dependency to your project:
libraryDependencies += "org.apache.pekko" %% "pekko-projection-testkit" % "1.1.0"
<properties>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-projection-testkit_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-projection-testkit_${versions.ScalaBinary}:1.1.0"
}
Import the ProjectionTestKit
and other utilities into a new JUnit test.
sourceimport org.apache.pekko
import pekko.projection.testkit.scaladsl.ProjectionTestKit
import pekko.projection.testkit.scaladsl.TestProjection
import pekko.projection.testkit.scaladsl.TestSourceProvider
sourceimport org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit;
import org.apache.pekko.projection.testkit.javadsl.TestProjection;
import org.apache.pekko.projection.testkit.javadsl.TestSourceProvider;
The TestKit includes several utilities to run the Projection handler in isolation so that a full projection implementation and source provider are not required.
ProjectionTestKit
runs a projection with the testActorSystem
.TestSourceProvider
allows the user to mock out test dataEnvelopes
that will be processed by the Projection Handler.TestProjection
is a test Projection implementation that uses an in-memory internal offset store.
Using these tools we can assert that our Projection handler meets the following requirements of the ItemPopularityProjectionHandler
.
- Process each shopping cart item event, correctly calculate the item count delta, and update the database.
- Log the popularity of every 10th shopping cart item event that is processed.
sourcepackage docs.guide
import java.time.Instant
import scala.concurrent.Future
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.persistence.query.Offset
import pekko.projection.ProjectionId
import pekko.projection.eventsourced.EventEnvelope
import pekko.stream.scaladsl.Source
import org.apache.pekko
import pekko.projection.testkit.scaladsl.ProjectionTestKit
import pekko.projection.testkit.scaladsl.TestProjection
import pekko.projection.testkit.scaladsl.TestSourceProvider
import org.scalatest.wordspec.AnyWordSpecLike
object ShoppingCartAppSpec {
// mock out the Cassandra data layer and simulate recording item count updates
class MockItemPopularityRepository extends ItemPopularityProjectionRepository {
var counts: Map[String, Long] = Map.empty
override def update(itemId: String, delta: Int): Future[Done] = Future.successful {
counts = counts + (itemId -> (counts.getOrElse(itemId, 0L) + delta))
Done
}
override def getItem(itemId: String): Future[Option[Long]] =
Future.successful(counts.get(itemId))
}
}
class ShoppingCartAppSpec extends ScalaTestWithActorTestKit() with AnyWordSpecLike {
import ShoppingCartAppSpec._
private val projectionTestKit = ProjectionTestKit(system)
def createEnvelope(event: ShoppingCartEvents.Event, seqNo: Long, timestamp: Long = 0L) =
EventEnvelope(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp)
"The ItemPopularityProjectionHandler" should {
"process item events correctly" in {
val repo = new MockItemPopularityRepository
val handler = new ItemPopularityProjectionHandler("tag", system, repo)
val events = Source(
List[EventEnvelope[ShoppingCartEvents.Event]](
createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L),
createEnvelope(ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1), 1L),
createEnvelope(ShoppingCartEvents.CheckedOut("a7098", Instant.parse("2020-01-01T12:00:00.00Z")), 2L),
createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "pekko t-shirt", 1), 3L),
createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L),
createEnvelope(ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L),
createEnvelope(ShoppingCartEvents.CheckedOut("0d12d", Instant.parse("2020-01-01T12:05:00.00Z")), 6L)))
val projectionId = ProjectionId("name", "key")
val sourceProvider =
TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]](events, extractOffset = env => env.offset)
val projection =
TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler)
projectionTestKit.run(projection) {
repo.counts shouldBe Map("bowling shoes" -> 2, "pekko t-shirt" -> 1, "skis" -> 0)
}
}
"log item popularity for day every 10 item events" in {
val repo = new MockItemPopularityRepository
val handler = new ItemPopularityProjectionHandler("tag", system, repo)
val events = (0L until 10L).map { i =>
createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), i)
}
val projectionId = ProjectionId("name", "key")
val sourceProvider =
TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]](
Source(events),
extractOffset = env => env.offset)
val projection =
TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler)
LoggingTestKit
.info("ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]")
.expect {
projectionTestKit.runWithTestSink(projection) { testSink =>
testSink.request(events.length)
testSink.expectNextN(events.length)
}
}
}
}
}
sourcepackage jdocs.guide;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.testkit.typed.javadsl.LoggingTestKit;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
import org.apache.pekko.projection.javadsl.Handler;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit;
import org.apache.pekko.projection.testkit.javadsl.TestProjection;
import org.apache.pekko.projection.testkit.javadsl.TestSourceProvider;
import org.apache.pekko.stream.javadsl.Source;
import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
public class ShoppingCartAppTest {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
public static final ProjectionTestKit projectionTestKit =
ProjectionTestKit.create(testKit.system());
EventEnvelope<ShoppingCartEvents.Event> createEnvelope(
ShoppingCartEvents.Event event, Long seqNo, Long timestamp) {
return EventEnvelope.create(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp);
}
@Test
public void projectionHandlerShouldProcessItemEventsCorrectly() {
MockItemPopularityRepository repo = new MockItemPopularityRepository();
Handler<EventEnvelope<ShoppingCartEvents.Event>> handler =
new ItemPopularityProjectionHandler("tag", testKit.system(), repo);
Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events =
Source.from(
Arrays.asList(
createEnvelope(
new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L, 0L),
createEnvelope(
new ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1),
1L,
0L),
createEnvelope(
new ShoppingCartEvents.CheckedOut(
"a7098", Instant.parse("2020-01-01T12:00:00.00Z")),
2L,
0L),
createEnvelope(
new ShoppingCartEvents.ItemAdded("0d12d", "pekko t-shirt", 1), 3L, 0L),
createEnvelope(new ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L, 0L),
createEnvelope(new ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L, 0L),
createEnvelope(
new ShoppingCartEvents.CheckedOut(
"0d12d", Instant.parse("2020-01-01T12:05:00.00Z")),
6L,
0L)));
ProjectionId projectionId = ProjectionId.of("name", "key");
SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
TestSourceProvider.create(events, env -> env.offset());
TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
TestProjection.create(projectionId, sourceProvider, () -> handler);
projectionTestKit.run(
projection,
() -> {
assertEquals(repo.counts.size(), 3);
assertEquals(repo.counts.get("bowling shoes"), Long.valueOf(2L));
assertEquals(repo.counts.get("pekko t-shirt"), Long.valueOf(1L));
assertEquals(repo.counts.get("skis"), Long.valueOf(0L));
});
}
@Test
public void projectionHandlerShouldLogItemPopularityEvery10Events() {
long eventsNum = 10L;
MockItemPopularityRepository repo = new MockItemPopularityRepository();
Handler<EventEnvelope<ShoppingCartEvents.Event>> handler =
new ItemPopularityProjectionHandler("tag", testKit.system(), repo);
Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events =
Source.fromJavaStream(
() ->
IntStream.range(0, (int) eventsNum)
.boxed()
.map(
i ->
createEnvelope(
new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1),
Long.valueOf(i),
0L)));
ProjectionId projectionId = ProjectionId.of("name", "key");
SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
TestSourceProvider.create(events, env -> env.offset());
TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
TestProjection.create(projectionId, sourceProvider, () -> handler);
LoggingTestKit.info(
"ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]")
.expect(
testKit.system(),
() -> {
projectionTestKit.runWithTestSink(
projection,
testSink -> {
testSink.request(eventsNum);
testSink.expectNextN(eventsNum);
});
return null; // FIXME: why is a return statement required?
});
}
static class MockItemPopularityRepository implements ItemPopularityProjectionRepository {
public Map<String, Long> counts = new HashMap<String, Long>();
@Override
public CompletionStage<Done> update(String itemId, int delta) {
counts.put(itemId, counts.getOrDefault(itemId, 0L) + delta);
return CompletableFuture.completedFuture(Done.getInstance());
}
@Override
public CompletionStage<Optional<Long>> getItem(String itemId) {
if (counts.containsKey(itemId))
return CompletableFuture.completedFuture(Optional.of(counts.get(itemId)));
return CompletableFuture.completedFuture(Optional.empty());
}
}
}
1.1.0