Writing tests for a Projection

Like other Pekko libraries, Projections ships with a TestKit that a user can include to assert the correctness of their Projection handler implementation. Add the Projections TestKit dependency to your project:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-testkit" % "1.0.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-testkit_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-testkit_${versions.ScalaBinary}:1.0.0"
}

Import the ProjectionTestKitProjectionTestKit and other utilities into a new ScalaTest test spec JUnit test.

Scala
sourceimport org.apache.pekko
import pekko.projection.testkit.scaladsl.ProjectionTestKit
import pekko.projection.testkit.scaladsl.TestProjection
import pekko.projection.testkit.scaladsl.TestSourceProvider
Java
sourceimport org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit;
import org.apache.pekko.projection.testkit.javadsl.TestProjection;
import org.apache.pekko.projection.testkit.javadsl.TestSourceProvider;

The TestKit includes several utilities to run the Projection handler in isolation so that a full projection implementation and source provider are not required.

Using these tools we can assert that our Projection handler meets the following requirements of the ItemPopularityProjectionHandler.

  1. Process each shopping cart item event, correctly calculate the item count delta, and update the database.
  2. Log the popularity of every 10th shopping cart item event that is processed.
Scala
sourcepackage docs.guide

import java.time.Instant

import scala.concurrent.Future

import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.persistence.query.Offset
import pekko.projection.ProjectionId
import pekko.projection.eventsourced.EventEnvelope
import pekko.stream.scaladsl.Source
import org.apache.pekko
import pekko.projection.testkit.scaladsl.ProjectionTestKit
import pekko.projection.testkit.scaladsl.TestProjection
import pekko.projection.testkit.scaladsl.TestSourceProvider
import org.scalatest.wordspec.AnyWordSpecLike

object ShoppingCartAppSpec {
  // mock out the Cassandra data layer and simulate recording item count updates
  class MockItemPopularityRepository extends ItemPopularityProjectionRepository {
    var counts: Map[String, Long] = Map.empty

    override def update(itemId: String, delta: Int): Future[Done] = Future.successful {
      counts = counts + (itemId -> (counts.getOrElse(itemId, 0L) + delta))
      Done
    }

    override def getItem(itemId: String): Future[Option[Long]] =
      Future.successful(counts.get(itemId))
  }
}

class ShoppingCartAppSpec extends ScalaTestWithActorTestKit() with AnyWordSpecLike {
  import ShoppingCartAppSpec._

  private val projectionTestKit = ProjectionTestKit(system)

  def createEnvelope(event: ShoppingCartEvents.Event, seqNo: Long, timestamp: Long = 0L) =
    EventEnvelope(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp)

  "The ItemPopularityProjectionHandler" should {
    "process item events correctly" in {
      val repo = new MockItemPopularityRepository
      val handler = new ItemPopularityProjectionHandler("tag", system, repo)

      val events = Source(
        List[EventEnvelope[ShoppingCartEvents.Event]](
          createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L),
          createEnvelope(ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1), 1L),
          createEnvelope(ShoppingCartEvents.CheckedOut("a7098", Instant.parse("2020-01-01T12:00:00.00Z")), 2L),
          createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "pekko t-shirt", 1), 3L),
          createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L),
          createEnvelope(ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L),
          createEnvelope(ShoppingCartEvents.CheckedOut("0d12d", Instant.parse("2020-01-01T12:05:00.00Z")), 6L)))

      val projectionId = ProjectionId("name", "key")
      val sourceProvider =
        TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]](events, extractOffset = env => env.offset)
      val projection =
        TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler)

      projectionTestKit.run(projection) {
        repo.counts shouldBe Map("bowling shoes" -> 2, "pekko t-shirt" -> 1, "skis" -> 0)
      }
    }

    "log item popularity for day every 10 item events" in {
      val repo = new MockItemPopularityRepository
      val handler = new ItemPopularityProjectionHandler("tag", system, repo)

      val events = (0L until 10L).map { i =>
        createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), i)
      }

      val projectionId = ProjectionId("name", "key")
      val sourceProvider =
        TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]](
          Source(events),
          extractOffset = env => env.offset)
      val projection =
        TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler)

      LoggingTestKit
        .info("ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]")
        .expect {
          projectionTestKit.runWithTestSink(projection) { testSink =>
            testSink.request(events.length)
            testSink.expectNextN(events.length)
          }
        }
    }
  }
}
Java
sourcepackage jdocs.guide;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.testkit.typed.javadsl.LoggingTestKit;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
import org.apache.pekko.projection.javadsl.Handler;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit;
import org.apache.pekko.projection.testkit.javadsl.TestProjection;
import org.apache.pekko.projection.testkit.javadsl.TestSourceProvider;
import org.apache.pekko.stream.javadsl.Source;
import org.junit.ClassRule;
import org.junit.Test;

import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;

public class ShoppingCartAppTest {
  @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
  public static final ProjectionTestKit projectionTestKit =
      ProjectionTestKit.create(testKit.system());

  EventEnvelope<ShoppingCartEvents.Event> createEnvelope(
      ShoppingCartEvents.Event event, Long seqNo, Long timestamp) {
    return EventEnvelope.create(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp);
  }

  @Test
  public void projectionHandlerShouldProcessItemEventsCorrectly() {
    MockItemPopularityRepository repo = new MockItemPopularityRepository();
    Handler<EventEnvelope<ShoppingCartEvents.Event>> handler =
        new ItemPopularityProjectionHandler("tag", testKit.system(), repo);

    Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events =
        Source.from(
            Arrays.asList(
                createEnvelope(
                    new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L, 0L),
                createEnvelope(
                    new ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1),
                    1L,
                    0L),
                createEnvelope(
                    new ShoppingCartEvents.CheckedOut(
                        "a7098", Instant.parse("2020-01-01T12:00:00.00Z")),
                    2L,
                    0L),
                createEnvelope(
                    new ShoppingCartEvents.ItemAdded("0d12d", "pekko t-shirt", 1), 3L, 0L),
                createEnvelope(new ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L, 0L),
                createEnvelope(new ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L, 0L),
                createEnvelope(
                    new ShoppingCartEvents.CheckedOut(
                        "0d12d", Instant.parse("2020-01-01T12:05:00.00Z")),
                    6L,
                    0L)));

    ProjectionId projectionId = ProjectionId.of("name", "key");
    SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
        TestSourceProvider.create(events, env -> env.offset());
    TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
        TestProjection.create(projectionId, sourceProvider, () -> handler);

    projectionTestKit.run(
        projection,
        () -> {
          assertEquals(repo.counts.size(), 3);
          assertEquals(repo.counts.get("bowling shoes"), Long.valueOf(2L));
          assertEquals(repo.counts.get("pekko t-shirt"), Long.valueOf(1L));
          assertEquals(repo.counts.get("skis"), Long.valueOf(0L));
        });
  }

  @Test
  public void projectionHandlerShouldLogItemPopularityEvery10Events() {
    long eventsNum = 10L;
    MockItemPopularityRepository repo = new MockItemPopularityRepository();
    Handler<EventEnvelope<ShoppingCartEvents.Event>> handler =
        new ItemPopularityProjectionHandler("tag", testKit.system(), repo);

    Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events =
        Source.fromJavaStream(
            () ->
                IntStream.range(0, (int) eventsNum)
                    .boxed()
                    .map(
                        i ->
                            createEnvelope(
                                new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1),
                                Long.valueOf(i),
                                0L)));

    ProjectionId projectionId = ProjectionId.of("name", "key");
    SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
        TestSourceProvider.create(events, env -> env.offset());
    TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
        TestProjection.create(projectionId, sourceProvider, () -> handler);

    LoggingTestKit.info(
            "ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]")
        .expect(
            testKit.system(),
            () -> {
              projectionTestKit.runWithTestSink(
                  projection,
                  testSink -> {
                    testSink.request(eventsNum);
                    testSink.expectNextN(eventsNum);
                  });
              return null; // FIXME: why is a return statement required?
            });
  }

  static class MockItemPopularityRepository implements ItemPopularityProjectionRepository {
    public Map<String, Long> counts = new HashMap<String, Long>();

    @Override
    public CompletionStage<Done> update(String itemId, int delta) {
      counts.put(itemId, counts.getOrDefault(itemId, 0L) + delta);
      return CompletableFuture.completedFuture(Done.getInstance());
    }

    @Override
    public CompletionStage<Optional<Long>> getItem(String itemId) {
      if (counts.containsKey(itemId))
        return CompletableFuture.completedFuture(Optional.of(counts.get(itemId)));

      return CompletableFuture.completedFuture(Optional.empty());
    }
  }
}