Testing

Apache Pekko Projections provides a TestKit to ease testing. There are two supported styles of test: running with an assert function and driving it with an Apache Pekko Streams TestKit TestSubscriber.Probe.

Dependencies

To use the Apache Pekko Projections TestKit add the following dependency in your project:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-testkit" % "1.0.0" % Test
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-testkit_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
    <scope>test</scope>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  testImplementation "org.apache.pekko:pekko-projection-testkit_${versions.ScalaBinary}:1.0.0"
}

Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.

Project Info: Apache Pekko Projections TestKit
Artifact
org.apache.pekko
pekko-projection-testkit
1.0.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions3.3.3, 2.13.13, 2.12.19
JPMS module namepekko.projection.testkit
License
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/apache/pekko-projection

Transitive dependencies

The table below shows pekko-projection-testkit’s direct dependencies and the second tab shows all libraries it depends on transitively.

Initializing the Projection TestKit

The Projection TestKit requires an instance of ActorTestKit. We recommend using Pekko’s ScalaTestWithActorTestKitTestKitJunitResource

Scala
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.projection.testkit.scaladsl.ProjectionTestKit

class TestKitDocExample extends ScalaTestWithActorTestKit {
  private val projectionTestKit = ProjectionTestKit(system)

}
Java
sourceimport org.apache.pekko.projection.testkit.javadsl.TestSourceProvider;
import org.junit.ClassRule;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit;

@ClassRule static final TestKitJunitResource testKit = new TestKitJunitResource();
ProjectionTestKit projectionTestKit = ProjectionTestKit.create(testKit.system());

Testing with an assert function

When testing with an assert function the Projection is started and stopped by the TestKit. While the projection is running, the assert function will be called until it completes without errors (no exceptions or assertion errors are thrown).

In the example below the Projection will update a CartView. The test will run until it observes that the CartView for id abc-def is available in the repository.

Scala
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.projection.testkit.scaladsl.ProjectionTestKit

projectionTestKit.run(projection) {
  // confirm that cart checkout was inserted in db
  cartViewRepository.findById("abc-def").futureValue
}
Java
sourceprojectionTestKit.run(
    projection,
    () ->
        cartCheckoutRepository
            .findById("abc-def")
            .toCompletableFuture()
            .get(1, TimeUnit.SECONDS));

By default, the test will run for 3 seconds. The assert function will be called every 100 milliseconds. Those values can be modified programatically.

Note: when testing a Projection with this method, the Restart Backoff is disabled. Any backoff configuration settings from .conf file or programmatically added will be overwritten.

Scala
sourceimport scala.concurrent.duration._

projectionTestKit.run(projection, max = 5.seconds, interval = 300.millis) {
  // confirm that cart checkout was inserted in db
  cartViewRepository.findById("abc-def").futureValue
}
Java
sourceimport java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

projectionTestKit.run(
    projection,
    Duration.ofSeconds(5),
    Duration.ofMillis(300),
    () ->
        cartCheckoutRepository
            .findById("abc-def")
            .toCompletableFuture()
            .get(1, TimeUnit.SECONDS));

Testing with a TestSubscriber.Probe

The Apache Pekko Stream TestKit can be used to drive the pace of envelopes flowing through the Projection.

The Projection starts as soon as the first element is requested by the TestSubscriber.Probe, new elements will be emitted as requested. The Projection is stopped once the assert function completes.

Scala
sourceprojectionTestKit.runWithTestSink(projection) { sinkProbe =>
  sinkProbe.request(1)
  sinkProbe.expectNext(Done)
}

// confirm that cart checkout was inserted in db
cartViewRepository.findById("abc-def").futureValue
Java
sourceimport static org.junit.Assert.assertEquals;

projectionTestKit.runWithTestSink(
    projection,
    sinkProbe -> {
      sinkProbe.request(1);
      sinkProbe.expectNext(Done.getInstance());
      cartCheckoutRepository.findById("abc-def").toCompletableFuture().get(1, TimeUnit.SECONDS);
    });

Testing with mocked Projection and SourceProvider

To test a handler in isolation you may want to mock out the implementation of a Projection or SourceProvider so that you don’t have to setup and teardown the associated technology as part of your integration test. For example, you may want to project against a Cassandra database, or read envelopes from an Apache Pekko Persistence journal source, but you don’t want to have to run Docker containers or embedded/in-memory services just to run your tests. The TestProjectionTestProjection allows you to isolate the runtime of your handler so that you don’t need to run these services. Using a TestProjection has the added benefit of being fast, since you can run everything within the JVM that runs your tests.

Alongside the TestProjection is the TestSourceProviderTestSourceProvider which can be used to provide test data to the TestProjection running the handler. Test data can be represented in a Pekko streams SourceSource that is passed to the TestSourceProvider constructor.

Scala
sourceimport org.apache.pekko.stream.scaladsl.Source

val testData = Source((0, "abc") :: (1, "def") :: Nil)

val extractOffset = (envelope: (Int, String)) => envelope._1

val sourceProvider = TestSourceProvider(testData, extractOffset)

val projection = TestProjection(ProjectionId("test", "00"), sourceProvider, () => handler)

projectionTestKit.run(projection) {
  // assert logic ..
}
Java
source
import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.projection.testkit.javadsl.TestProjection; List<Pair<Integer, String>> testData = Stream.of(Pair.create(0, "abc"), Pair.create(1, "def")).collect(Collectors.toList()); Source<Pair<Integer, String>, NotUsed> source = Source.from(testData); Function<Pair<Integer, String>, Integer> extractOffsetFn = (Pair<Integer, String> env) -> env.first(); TestSourceProvider<Integer, Pair<Integer, String>> sourceProvider = TestSourceProvider.create(source, extractOffsetFn); Projection<Pair<Integer, String>> projection = TestProjection.create(ProjectionId.of("test", "00"), sourceProvider, () -> handler); projectionTestKit.run( projection, () -> { // assert logic ... });