Testing streams
Dependency
To use Pekko Stream TestKit, add the module to your project:
- sbt
val PekkoVersion = "1.1.2" libraryDependencies += "org.apache.pekko" %% "pekko-stream-testkit" % PekkoVersion % Test
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-testkit_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2") testImplementation "org.apache.pekko:pekko-stream-testkit_${versions.ScalaBinary}" }
Introduction
Verifying behavior of Pekko Stream sources, flows and sinks can be done using various code patterns and libraries. Here we will discuss testing these elements using:
- simple sources, sinks and flows;
- sources and sinks in combination with
TestProbe
TestProbe
from thepekko-testkit
module; - sources and sinks specifically crafted for writing tests from the
pekko-stream-testkit
module.
It is important to keep your data processing pipeline as separate sources, flows and sinks. This makes them testable by wiring them up to other sources or sinks, or some test harnesses that pekko-testkit
or pekko-stream-testkit
provide.
Built-in sources, sinks and operators
Testing a custom sink can be as simple as attaching a source that emits elements from a predefined collection, running a constructed test flow and asserting on the results that sink produced. Here is an example of a test for a sink:
- Scala
-
source
val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right) val future = Source(1 to 4).runWith(sinkUnderTest) val result = Await.result(future, 3.seconds) assert(result == 20)
- Java
-
source
final Sink<Integer, CompletionStage<Integer>> sinkUnderTest = Flow.of(Integer.class) .map(i -> i * 2) .toMat(Sink.fold(0, (agg, next) -> agg + next), Keep.right()); final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4)).runWith(sinkUnderTest, system); final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(20, result.intValue());
The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some condition. Here the take
take
operator and Sink.seq
Sink.seq
are very useful.
- Scala
-
source
import system.dispatcher import org.apache.pekko.pattern.pipe val sourceUnderTest = Source.repeat(1).map(_ * 2) val future = sourceUnderTest.take(10).runWith(Sink.seq) val result = Await.result(future, 3.seconds) assert(result == Seq.fill(10)(2))
- Java
-
source
final Source<Integer, NotUsed> sourceUnderTest = Source.repeat(1).map(i -> i * 2); final CompletionStage<List<Integer>> future = sourceUnderTest.take(10).runWith(Sink.seq(), system); final List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Collections.nCopies(10, 2), result);
When testing a flow we need to attach a source and a sink. As both stream ends are under our control, we can choose sources that tests various edge cases of the flow and sinks that ease assertions.
- Scala
-
source
val flowUnderTest = Flow[Int].takeWhile(_ < 5) val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _)) val result = Await.result(future, 3.seconds) assert(result == (1 to 4))
- Java
-
source
final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class).takeWhile(i -> i < 5); final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6)) .via(flowUnderTest) .runWith(Sink.fold(0, (agg, next) -> agg + next), system); final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(10, result.intValue());
TestKit
Pekko Stream offers integration with Actors out of the box. This support can be used for writing stream tests that use familiar TestProbe
TestProbe
from the pekko-testkit
API.
One of the more straightforward tests would be to materialize stream to a Future
CompletionStage
and then use pipe
Patterns.pipe
pattern to pipe the result of that future to the probe.
- Scala
-
source
import system.dispatcher import org.apache.pekko.pattern.pipe val sourceUnderTest = Source(1 to 4).grouped(2) val probe = TestProbe() sourceUnderTest.runWith(Sink.seq).pipeTo(probe.ref) probe.expectMsg(3.seconds, Seq(Seq(1, 2), Seq(3, 4)))
- Java
-
source
final Source<List<Integer>, NotUsed> sourceUnderTest = Source.from(Arrays.asList(1, 2, 3, 4)).grouped(2); final TestKit probe = new TestKit(system); final CompletionStage<List<List<Integer>>> future = sourceUnderTest.grouped(2).runWith(Sink.head(), system); org.apache.pekko.pattern.Patterns.pipe(future, system.dispatcher()).to(probe.getRef()); probe.expectMsg(Duration.ofSeconds(3), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)));
Instead of materializing to a future, we can use a Sink.actorRef
Sink.actorRef
that sends all incoming elements to the given ActorRef
ActorRef
. Now we can use assertion methods on TestProbe
TestProbe
and expect elements one by one as they arrive. We can also assert stream completion by expecting for onCompleteMessage
which was given to Sink.actorRef
.
- Scala
-
source
case object Tick val sourceUnderTest = Source.tick(0.seconds, 200.millis, Tick) val probe = TestProbe() val cancellable = sourceUnderTest .to(Sink.actorRef(probe.ref, onCompleteMessage = "completed", onFailureMessage = _ => "failed")) .run() probe.expectMsg(1.second, Tick) probe.expectNoMessage(100.millis) probe.expectMsg(3.seconds, Tick) cancellable.cancel() probe.expectMsg(3.seconds, "completed")
- Java
-
source
final Source<Tick, Cancellable> sourceUnderTest = Source.tick(Duration.ZERO, Duration.ofMillis(200), Tick.TOCK); final TestKit probe = new TestKit(system); final Cancellable cancellable = sourceUnderTest.to(Sink.actorRef(probe.getRef(), Tick.COMPLETED)).run(system); probe.expectMsg(Duration.ofSeconds(3), Tick.TOCK); probe.expectNoMessage(Duration.ofMillis(100)); probe.expectMsg(Duration.ofSeconds(3), Tick.TOCK); cancellable.cancel(); probe.expectMsg(Duration.ofSeconds(3), Tick.COMPLETED);
Similarly to Sink.actorRef
that provides control over received elements, we can use Source.actorRef
Source.actorRef
and have full control over elements to be sent.
- Scala
-
source
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) val (ref, future) = Source .actorRef( completionMatcher = { case Done => CompletionStrategy.draining }, // Never fail the stream because of a message: failureMatcher = PartialFunction.empty, bufferSize = 8, overflowStrategy = OverflowStrategy.fail) .toMat(sinkUnderTest)(Keep.both) .run() ref ! 1 ref ! 2 ref ! 3 ref ! Done val result = Await.result(future, 3.seconds) assert(result == "123")
- Java
-
source
final Sink<Integer, CompletionStage<String>> sinkUnderTest = Flow.of(Integer.class) .map(i -> i.toString()) .toMat(Sink.fold("", (agg, next) -> agg + next), Keep.right()); final Pair<ActorRef, CompletionStage<String>> refAndCompletionStage = Source.<Integer>actorRef( elem -> { // complete stream immediately if we send it Done if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately()); else return Optional.empty(); }, // never fail the stream because of a message elem -> Optional.empty(), 8, OverflowStrategy.fail()) .toMat(sinkUnderTest, Keep.both()) .run(system); final ActorRef ref = refAndCompletionStage.first(); final CompletionStage<String> future = refAndCompletionStage.second(); ref.tell(1, ActorRef.noSender()); ref.tell(2, ActorRef.noSender()); ref.tell(3, ActorRef.noSender()); ref.tell(Done.getInstance(), ActorRef.noSender()); final String result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals("123", result);
Streams TestKit
You may have noticed various code patterns that emerge when testing stream pipelines. Pekko Stream has a separate pekko-stream-testkit
module that provides tools specifically for writing stream tests. This module comes with two main components that are TestSource
TestSource
and TestSink
TestSink
which provide sources and sinks that materialize to probes that allow fluent API.
Using the TestKit
A sink returned by TestSink.probe
TestSink.probe
allows manual control over demand and assertions over elements coming downstream.
- Scala
-
source
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2) sourceUnderTest.runWith(TestSink[Int]()).request(2).expectNext(4, 8).expectComplete()
- Java
-
source
final Source<Integer, NotUsed> sourceUnderTest = Source.from(Arrays.asList(1, 2, 3, 4)).filter(elem -> elem % 2 == 0).map(elem -> elem * 2); sourceUnderTest .runWith(TestSink.probe(system), system) .request(2) .expectNext(4, 8) .expectComplete();
A source returned by TestSource.probe
TestSource.probe
can be used for asserting demand or controlling when stream is completed or ended with an error.
- Scala
-
source
val sinkUnderTest = Sink.cancelled TestSource.probe[Int].toMat(sinkUnderTest)(Keep.left).run().expectCancellation()
- Java
-
source
final Sink<Integer, NotUsed> sinkUnderTest = Sink.cancelled(); TestSource.<Integer>probe(system) .toMat(sinkUnderTest, Keep.left()) .run(system) .expectCancellation();
You can also inject exceptions and test sink behavior on error conditions.
- Scala
-
source
val sinkUnderTest = Sink.head[Int] val (probe, future) = TestSource.probe[Int].toMat(sinkUnderTest)(Keep.both).run() probe.sendError(new Exception("boom")) assert(future.failed.futureValue.getMessage == "boom")
- Java
-
source
final Sink<Integer, CompletionStage<Integer>> sinkUnderTest = Sink.head(); final Pair<TestPublisher.Probe<Integer>, CompletionStage<Integer>> probeAndCompletionStage = TestSource.<Integer>probe(system).toMat(sinkUnderTest, Keep.both()).run(system); final TestPublisher.Probe<Integer> probe = probeAndCompletionStage.first(); final CompletionStage<Integer> future = probeAndCompletionStage.second(); probe.sendError(new Exception("boom")); ExecutionException exception = Assert.assertThrows( ExecutionException.class, () -> future.toCompletableFuture().get(3, TimeUnit.SECONDS)); assertEquals("boom", exception.getCause().getMessage());
Test source and sink can be used together in combination when testing flows.
- Scala
-
source
val flowUnderTest = Flow[Int].mapAsyncUnordered(2) { sleep => pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep)) } val (pub, sub) = TestSource.probe[Int].via(flowUnderTest).toMat(TestSink[Int]())(Keep.both).run() sub.request(n = 3) pub.sendNext(3) pub.sendNext(2) pub.sendNext(1) sub.expectNextUnordered(1, 2, 3) pub.sendError(new Exception("Power surge in the linear subroutine C-47!")) val ex = sub.expectError() assert(ex.getMessage.contains("C-47"))
- Java
-
source
final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class) .mapAsyncUnordered( 2, sleep -> org.apache.pekko.pattern.Patterns.after( Duration.ofMillis(10), system.scheduler(), system.dispatcher(), () -> CompletableFuture.completedFuture(sleep))); final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubAndSub = TestSource.<Integer>probe(system) .via(flowUnderTest) .toMat(TestSink.<Integer>probe(system), Keep.both()) .run(system); final TestPublisher.Probe<Integer> pub = pubAndSub.first(); final TestSubscriber.Probe<Integer> sub = pubAndSub.second(); sub.request(3); pub.sendNext(3); pub.sendNext(2); pub.sendNext(1); sub.expectNextUnordered(1, 2, 3); pub.sendError(new Exception("Power surge in the linear subroutine C-47!")); final Throwable ex = sub.expectError(); assertTrue(ex.getMessage().contains("C-47"));
Fuzzing Mode
For testing, it is possible to enable a special stream execution mode that exercises concurrent execution paths more aggressively (at the cost of reduced performance) and therefore helps exposing race conditions in tests. To enable this setting add the following line to your configuration:
pekko.stream.materializer.debug.fuzzing-mode = on
Never use this setting in production or benchmarks. This is a testing tool to provide more coverage of your code during tests, but it reduces the throughput of streams. A warning message will be logged if you have this setting enabled.