Writing tests for a Projection
Like other Pekko libraries, Projections ships with a TestKit that a user can include to assert the correctness of their Projection handler implementation. Add the Projections TestKit dependency to your project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-testkit" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-testkit_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-testkit_${versions.ScalaBinary}:1.0.0" }
Import the ProjectionTestKit
ProjectionTestKit
and other utilities into a new ScalaTest test spec JUnit test.
- Scala
-
source
import org.apache.pekko import pekko.projection.testkit.scaladsl.ProjectionTestKit import pekko.projection.testkit.scaladsl.TestProjection import pekko.projection.testkit.scaladsl.TestSourceProvider
- Java
-
source
import org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit; import org.apache.pekko.projection.testkit.javadsl.TestProjection; import org.apache.pekko.projection.testkit.javadsl.TestSourceProvider;
The TestKit includes several utilities to run the Projection handler in isolation so that a full projection implementation and source provider are not required.
ProjectionTestKit
ProjectionTestKit
runs a projection with the testActorSystem
ActorSystem
.TestSourceProvider
TestSourceProvider
allows the user to mock out test dataEnvelopes
that will be processed by the Projection Handler.TestProjection
TestProjection
is a test Projection implementation that uses an in-memory internal offset store.
Using these tools we can assert that our Projection handler meets the following requirements of the ItemPopularityProjectionHandler
.
- Process each shopping cart item event, correctly calculate the item count delta, and update the database.
- Log the popularity of every 10th shopping cart item event that is processed.
- Scala
-
source
package docs.guide import java.time.Instant import scala.concurrent.Future import org.apache.pekko import pekko.Done import pekko.actor.testkit.typed.scaladsl.LoggingTestKit import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.persistence.query.Offset import pekko.projection.ProjectionId import pekko.projection.eventsourced.EventEnvelope import pekko.stream.scaladsl.Source import org.apache.pekko import pekko.projection.testkit.scaladsl.ProjectionTestKit import pekko.projection.testkit.scaladsl.TestProjection import pekko.projection.testkit.scaladsl.TestSourceProvider import org.scalatest.wordspec.AnyWordSpecLike object ShoppingCartAppSpec { // mock out the Cassandra data layer and simulate recording item count updates class MockItemPopularityRepository extends ItemPopularityProjectionRepository { var counts: Map[String, Long] = Map.empty override def update(itemId: String, delta: Int): Future[Done] = Future.successful { counts = counts + (itemId -> (counts.getOrElse(itemId, 0L) + delta)) Done } override def getItem(itemId: String): Future[Option[Long]] = Future.successful(counts.get(itemId)) } } class ShoppingCartAppSpec extends ScalaTestWithActorTestKit() with AnyWordSpecLike { import ShoppingCartAppSpec._ private val projectionTestKit = ProjectionTestKit(system) def createEnvelope(event: ShoppingCartEvents.Event, seqNo: Long, timestamp: Long = 0L) = EventEnvelope(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp) "The ItemPopularityProjectionHandler" should { "process item events correctly" in { val repo = new MockItemPopularityRepository val handler = new ItemPopularityProjectionHandler("tag", system, repo) val events = Source( List[EventEnvelope[ShoppingCartEvents.Event]]( createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L), createEnvelope(ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1), 1L), createEnvelope(ShoppingCartEvents.CheckedOut("a7098", Instant.parse("2020-01-01T12:00:00.00Z")), 2L), createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "pekko t-shirt", 1), 3L), createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L), createEnvelope(ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L), createEnvelope(ShoppingCartEvents.CheckedOut("0d12d", Instant.parse("2020-01-01T12:05:00.00Z")), 6L))) val projectionId = ProjectionId("name", "key") val sourceProvider = TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]](events, extractOffset = env => env.offset) val projection = TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler) projectionTestKit.run(projection) { repo.counts shouldBe Map("bowling shoes" -> 2, "pekko t-shirt" -> 1, "skis" -> 0) } } "log item popularity for day every 10 item events" in { val repo = new MockItemPopularityRepository val handler = new ItemPopularityProjectionHandler("tag", system, repo) val events = (0L until 10L).map { i => createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), i) } val projectionId = ProjectionId("name", "key") val sourceProvider = TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]]( Source(events), extractOffset = env => env.offset) val projection = TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler) LoggingTestKit .info("ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]") .expect { projectionTestKit.runWithTestSink(projection) { testSink => testSink.request(events.length) testSink.expectNextN(events.length) } } } } }
- Java
-
source
package jdocs.guide; import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.testkit.typed.javadsl.LoggingTestKit; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.apache.pekko.persistence.query.Offset; import org.apache.pekko.projection.ProjectionId; import org.apache.pekko.projection.eventsourced.EventEnvelope; import org.apache.pekko.projection.javadsl.Handler; import org.apache.pekko.projection.javadsl.SourceProvider; import org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit; import org.apache.pekko.projection.testkit.javadsl.TestProjection; import org.apache.pekko.projection.testkit.javadsl.TestSourceProvider; import org.apache.pekko.stream.javadsl.Source; import org.junit.ClassRule; import org.junit.Test; import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; public class ShoppingCartAppTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); public static final ProjectionTestKit projectionTestKit = ProjectionTestKit.create(testKit.system()); EventEnvelope<ShoppingCartEvents.Event> createEnvelope( ShoppingCartEvents.Event event, Long seqNo, Long timestamp) { return EventEnvelope.create(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp); } @Test public void projectionHandlerShouldProcessItemEventsCorrectly() { MockItemPopularityRepository repo = new MockItemPopularityRepository(); Handler<EventEnvelope<ShoppingCartEvents.Event>> handler = new ItemPopularityProjectionHandler("tag", testKit.system(), repo); Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events = Source.from( Arrays.asList( createEnvelope( new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L, 0L), createEnvelope( new ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1), 1L, 0L), createEnvelope( new ShoppingCartEvents.CheckedOut( "a7098", Instant.parse("2020-01-01T12:00:00.00Z")), 2L, 0L), createEnvelope( new ShoppingCartEvents.ItemAdded("0d12d", "pekko t-shirt", 1), 3L, 0L), createEnvelope(new ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L, 0L), createEnvelope(new ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L, 0L), createEnvelope( new ShoppingCartEvents.CheckedOut( "0d12d", Instant.parse("2020-01-01T12:05:00.00Z")), 6L, 0L))); ProjectionId projectionId = ProjectionId.of("name", "key"); SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider = TestSourceProvider.create(events, env -> env.offset()); TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection = TestProjection.create(projectionId, sourceProvider, () -> handler); projectionTestKit.run( projection, () -> { assertEquals(repo.counts.size(), 3); assertEquals(repo.counts.get("bowling shoes"), Long.valueOf(2L)); assertEquals(repo.counts.get("pekko t-shirt"), Long.valueOf(1L)); assertEquals(repo.counts.get("skis"), Long.valueOf(0L)); }); } @Test public void projectionHandlerShouldLogItemPopularityEvery10Events() { long eventsNum = 10L; MockItemPopularityRepository repo = new MockItemPopularityRepository(); Handler<EventEnvelope<ShoppingCartEvents.Event>> handler = new ItemPopularityProjectionHandler("tag", testKit.system(), repo); Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events = Source.fromJavaStream( () -> IntStream.range(0, (int) eventsNum) .boxed() .map( i -> createEnvelope( new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), Long.valueOf(i), 0L))); ProjectionId projectionId = ProjectionId.of("name", "key"); SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider = TestSourceProvider.create(events, env -> env.offset()); TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection = TestProjection.create(projectionId, sourceProvider, () -> handler); LoggingTestKit.info( "ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]") .expect( testKit.system(), () -> { projectionTestKit.runWithTestSink( projection, testSink -> { testSink.request(eventsNum); testSink.expectNextN(eventsNum); }); return null; // FIXME: why is a return statement required? }); } static class MockItemPopularityRepository implements ItemPopularityProjectionRepository { public Map<String, Long> counts = new HashMap<String, Long>(); @Override public CompletionStage<Done> update(String itemId, int delta) { counts.put(itemId, counts.getOrDefault(itemId, 0L) + delta); return CompletableFuture.completedFuture(Done.getInstance()); } @Override public CompletionStage<Optional<Long>> getItem(String itemId) { if (counts.containsKey(itemId)) return CompletableFuture.completedFuture(Optional.of(counts.get(itemId))); return CompletableFuture.completedFuture(Optional.empty()); } } }
1.0.0