Testing
Apache Pekko Projections provides a TestKit to ease testing. There are two supported styles of test: running with an assert function and driving it with an Apache Pekko Streams TestKit TestSubscriber.Probe
.
Dependencies
To use the Apache Pekko Projections TestKit add the following dependency in your project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-testkit" % "1.0.0" % Test
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-testkit_${scala.binary.version}</artifactId> <version>1.0.0</version> <scope>test</scope> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { testImplementation "org.apache.pekko:pekko-projection-testkit_${versions.ScalaBinary}:1.0.0" }
Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.
Project Info: Apache Pekko Projections TestKit | |
---|---|
Artifact | org.apache.pekko
pekko-projection-testkit
1.0.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 3.3.3, 2.13.13, 2.12.19 |
JPMS module name | pekko.projection.testkit |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies
The table below shows pekko-projection-testkit
’s direct dependencies and the second tab shows all libraries it depends on transitively.
Initializing the Projection TestKit
The Projection TestKit requires an instance of ActorTestKit
. We recommend using Pekko’s ScalaTestWithActorTestKit
TestKitJunitResource
- Scala
-
source
import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.projection.testkit.scaladsl.ProjectionTestKit class TestKitDocExample extends ScalaTestWithActorTestKit { private val projectionTestKit = ProjectionTestKit(system) }
- Java
-
source
import org.apache.pekko.projection.testkit.javadsl.TestSourceProvider; import org.junit.ClassRule; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit; @ClassRule static final TestKitJunitResource testKit = new TestKitJunitResource(); ProjectionTestKit projectionTestKit = ProjectionTestKit.create(testKit.system());
Testing with an assert function
When testing with an assert function the Projection is started and stopped by the TestKit. While the projection is running, the assert function will be called until it completes without errors (no exceptions or assertion errors are thrown).
In the example below the Projection will update a CartView
. The test will run until it observes that the CartView
for id abc-def
is available in the repository.
- Scala
-
source
import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.projection.testkit.scaladsl.ProjectionTestKit projectionTestKit.run(projection) { // confirm that cart checkout was inserted in db cartViewRepository.findById("abc-def").futureValue }
- Java
-
source
projectionTestKit.run( projection, () -> cartCheckoutRepository .findById("abc-def") .toCompletableFuture() .get(1, TimeUnit.SECONDS));
By default, the test will run for 3 seconds. The assert function will be called every 100 milliseconds. Those values can be modified programatically.
Note: when testing a Projection with this method, the Restart Backoff is disabled. Any backoff configuration settings from .conf
file or programmatically added will be overwritten.
- Scala
-
source
import scala.concurrent.duration._ projectionTestKit.run(projection, max = 5.seconds, interval = 300.millis) { // confirm that cart checkout was inserted in db cartViewRepository.findById("abc-def").futureValue }
- Java
-
source
import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; projectionTestKit.run( projection, Duration.ofSeconds(5), Duration.ofMillis(300), () -> cartCheckoutRepository .findById("abc-def") .toCompletableFuture() .get(1, TimeUnit.SECONDS));
Testing with a TestSubscriber.Probe
The Apache Pekko Stream TestKit can be used to drive the pace of envelopes flowing through the Projection.
The Projection starts as soon as the first element is requested by the TestSubscriber.Probe
, new elements will be emitted as requested. The Projection is stopped once the assert function completes.
- Scala
-
source
projectionTestKit.runWithTestSink(projection) { sinkProbe => sinkProbe.request(1) sinkProbe.expectNext(Done) } // confirm that cart checkout was inserted in db cartViewRepository.findById("abc-def").futureValue
- Java
-
source
import static org.junit.Assert.assertEquals; projectionTestKit.runWithTestSink( projection, sinkProbe -> { sinkProbe.request(1); sinkProbe.expectNext(Done.getInstance()); cartCheckoutRepository.findById("abc-def").toCompletableFuture().get(1, TimeUnit.SECONDS); });
Testing with mocked Projection and SourceProvider
To test a handler in isolation you may want to mock out the implementation of a Projection or SourceProvider so that you don’t have to setup and teardown the associated technology as part of your integration test. For example, you may want to project against a Cassandra database, or read envelopes from an Apache Pekko Persistence journal source, but you don’t want to have to run Docker containers or embedded/in-memory services just to run your tests. The TestProjection
TestProjection
allows you to isolate the runtime of your handler so that you don’t need to run these services. Using a TestProjection
has the added benefit of being fast, since you can run everything within the JVM that runs your tests.
Alongside the TestProjection
is the TestSourceProvider
TestSourceProvider
which can be used to provide test data to the TestProjection
running the handler. Test data can be represented in a Pekko streams Source
Source
that is passed to the TestSourceProvider
constructor.
- Scala
-
source
import org.apache.pekko.stream.scaladsl.Source val testData = Source((0, "abc") :: (1, "def") :: Nil) val extractOffset = (envelope: (Int, String)) => envelope._1 val sourceProvider = TestSourceProvider(testData, extractOffset) val projection = TestProjection(ProjectionId("test", "00"), sourceProvider, () => handler) projectionTestKit.run(projection) { // assert logic .. }
- Java
-
source
import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.projection.testkit.javadsl.TestProjection; List<Pair<Integer, String>> testData = Stream.of(Pair.create(0, "abc"), Pair.create(1, "def")).collect(Collectors.toList()); Source<Pair<Integer, String>, NotUsed> source = Source.from(testData); Function<Pair<Integer, String>, Integer> extractOffsetFn = (Pair<Integer, String> env) -> env.first(); TestSourceProvider<Integer, Pair<Integer, String>> sourceProvider = TestSourceProvider.create(source, extractOffsetFn); Projection<Pair<Integer, String>> projection = TestProjection.create(ProjectionId.of("test", "00"), sourceProvider, () -> handler); projectionTestKit.run( projection, () -> { // assert logic ... });