Asynchronous testing
You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Testing.
Asynchronous testing uses a real ActorSystem
ActorSystem
that allows you to test your Actors in a more realistic environment.
The minimal setup consists of the test procedure, which provides the desired stimuli, the actor under test, and an actor receiving replies. Bigger systems replace the actor under test with a network of actors, apply stimuli at varying injection points and arrange results to be sent from different emission points, but the basic principle stays the same in that a single procedure drives the test.
Basic example
Actor under test:
- Scala
-
source
object Echo { case class Ping(message: String, response: ActorRef[Pong]) case class Pong(message: String) def apply(): Behavior[Ping] = Behaviors.receiveMessage { case Ping(m, replyTo) => replyTo ! Pong(m) Behaviors.same } }
- Java
-
source
public static class Echo { public static class Ping { public final String message; public final ActorRef<Pong> replyTo; public Ping(String message, ActorRef<Pong> replyTo) { this.message = message; this.replyTo = replyTo; } } public static class Pong { public final String message; public Pong(String message) { this.message = message; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof Pong)) return false; Pong pong = (Pong) o; return message.equals(pong.message); } @Override public int hashCode() { return Objects.hash(message); } } public static Behavior<Ping> create() { return Behaviors.receive(Ping.class) .onMessage( Ping.class, ping -> { ping.replyTo.tell(new Pong(ping.message)); return Behaviors.same(); }) .build(); } }
Tests create an instance of ActorTestKit
ActorTestKit
. This provides access to:
- An ActorSystem
- Methods for spawning Actors. These are created under the special testkit user guardian
- A method to shut down the ActorSystem from the test suite
This first example is using the “raw” ActorTestKit
but if you are using ScalaTestJUnit you can simplify the tests by using the Test framework integration. It’s still good to read this section to understand how it works.
- Scala
-
source
import org.apache.pekko.actor.testkit.typed.scaladsl.ActorTestKit import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class AsyncTestingExampleSpec extends AnyWordSpec with BeforeAndAfterAll with Matchers { val testKit = ActorTestKit() }
- Java
-
source
import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit; public class AsyncTestingExampleTest { static final ActorTestKit testKit = ActorTestKit.create(); }
Your test is responsible for shutting down the ActorSystem
ActorSystem
e.g. using BeforeAndAfterAll
when using ScalaTest@AfterClass
when using JUnit.
- Scala
-
source
override def afterAll(): Unit = testKit.shutdownTestKit()
- Java
-
source
@AfterClass public static void cleanup() { testKit.shutdownTestKit(); }
The following demonstrates:
- Creating an actor from the
TestKit
’s system usingspawn
- Creating a
TestProbe
- Verifying that the actor under test responds via the
TestProbe
Note that it is possible to use a TestProbe
directly as a RecipientRef
RecipientRef
(a common supertype of ActorRef
and Cluster Sharding EntityRef
), in cases where a message protocol uses RecipientRef
instead of specifying ActorRef
or EntityRef
.
- Scala
-
source
val pinger = testKit.spawn(Echo(), "ping") val probe = testKit.createTestProbe[Echo.Pong]() pinger ! Echo.Ping("hello", probe.ref) probe.expectMessage(Echo.Pong("hello"))
- Java
-
source
ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping"); TestProbe<Echo.Pong> probe = testKit.createTestProbe(); pinger.tell(new Echo.Ping("hello", probe.ref())); probe.expectMessage(new Echo.Pong("hello"));
Actors can also be spawned anonymously:
- Scala
-
source
val pinger = testKit.spawn(Echo())
- Java
-
source
ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create());
Note that you can add import testKit._
to get access to the spawn
and createTestProbe
methods at the top level without prefixing them with testKit
.
Stopping actors
The method will wait until the actor stops or throw an assertion error in case of a timeout.
- Scala
-
source
val pinger1 = testKit.spawn(Echo(), "pinger") pinger1 ! Echo.Ping("hello", probe.ref) probe.expectMessage(Echo.Pong("hello")) testKit.stop(pinger1) // Uses default timeout // Immediately creating an actor with the same name val pinger2 = testKit.spawn(Echo(), "pinger") pinger2 ! Echo.Ping("hello", probe.ref) probe.expectMessage(Echo.Pong("hello")) testKit.stop(pinger2, 10.seconds) // Custom timeout
- Java
-
source
ActorRef<Echo.Ping> pinger1 = testKit.spawn(Echo.create(), "pinger"); pinger1.tell(new Echo.Ping("hello", probe.ref())); probe.expectMessage(new Echo.Pong("hello")); testKit.stop(pinger1); // Immediately creating an actor with the same name ActorRef<Echo.Ping> pinger2 = testKit.spawn(Echo.create(), "pinger"); pinger2.tell(new Echo.Ping("hello", probe.ref())); probe.expectMessage(new Echo.Pong("hello")); testKit.stop(pinger2, Duration.ofSeconds(10));
The stop
method can only be used for actors that were spawned by the same ActorTestKit
ActorTestKit
. Other actors will not be stopped by that method.
Observing mocked behavior
When testing a component (which may be an actor or not) that interacts with other actors it can be useful to not have to run the other actors it depends on. Instead, you might want to create mock behaviors that accept and possibly respond to messages in the same way the other actor would do but without executing any actual logic. In addition to this it can also be useful to observe those interactions to assert that the component under test did send the expected messages. This allows the same kinds of tests as classic TestActor
/Autopilot
.
As an example, let’s assume we’d like to test the following component:
- Scala
-
source
case class Message(i: Int, replyTo: ActorRef[Try[Int]]) class Producer(publisher: ActorRef[Message])(implicit scheduler: Scheduler) { def produce(messages: Int)(implicit timeout: Timeout): Unit = { (0 until messages).foreach(publish) } private def publish(i: Int)(implicit timeout: Timeout): Future[Try[Int]] = { publisher.ask(ref => Message(i, ref)) } }
- Java
-
source
static class Message { int i; ActorRef<Integer> replyTo; Message(int i, ActorRef<Integer> replyTo) { this.i = i; this.replyTo = replyTo; } } public static class Producer { private Scheduler scheduler; private ActorRef<Message> publisher; Producer(Scheduler scheduler, ActorRef<Message> publisher) { this.scheduler = scheduler; this.publisher = publisher; } public void produce(int messages) { IntStream.range(0, messages).forEach(this::publish); } private CompletionStage<Integer> publish(int i) { return AskPattern.ask( publisher, (ActorRef<Integer> ref) -> new Message(i, ref), Duration.ofSeconds(3), scheduler); } }
In our test, we create a mocked publisher
actor. Additionally we use Behaviors.monitor
with a TestProbe
in order to be able to verify the interaction of the producer
with the publisher
:
- Scala
-
source
import testKit._ // simulate the happy path val mockedBehavior = Behaviors.receiveMessage[Message] { msg => msg.replyTo ! Success(msg.i) Behaviors.same } val probe = testKit.createTestProbe[Message]() val mockedPublisher = testKit.spawn(Behaviors.monitor(probe.ref, mockedBehavior)) // test our component val producer = new Producer(mockedPublisher) val messages = 3 producer.produce(messages) // verify expected behavior for (i <- 0 until messages) { val msg = probe.expectMessageType[Message] msg.i shouldBe i }
- Java
-
source
// simulate the happy path Behavior<Message> mockedBehavior = Behaviors.receiveMessage( message -> { message.replyTo.tell(message.i); return Behaviors.same(); }); TestProbe<Message> probe = testKit.createTestProbe(); ActorRef<Message> mockedPublisher = testKit.spawn(Behaviors.monitor(Message.class, probe.ref(), mockedBehavior)); // test our component Producer producer = new Producer(testKit.scheduler(), mockedPublisher); int messages = 3; producer.produce(messages); // verify expected behavior IntStream.range(0, messages) .forEach( i -> { Message msg = probe.expectMessageClass(Message.class); assertEquals(i, msg.i); });
Test framework integration
If you are using JUnit you can use TestKitJunitResource
to have the async test kit automatically shutdown when the test is complete.
Note that the dependency on JUnit is marked as optional from the test kit module, so your project must explicitly include a dependency on JUnit to use this.
If you are using ScalaTest you can extend ScalaTestWithActorTestKit
to have the async test kit automatically shutdown when the test is complete. This is done in afterAll
from the BeforeAndAfterAll
trait. If you override that method you should call super.afterAll
to shutdown the test kit.
Note that the dependency on ScalaTest is marked as optional from the test kit module, so your project must explicitly include a dependency on ScalaTest to use this.
- Scala
-
source
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import org.scalatest.wordspec.AnyWordSpecLike class ScalaTestIntegrationExampleSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { "Something" must { "behave correctly" in { val pinger = testKit.spawn(Echo(), "ping") val probe = testKit.createTestProbe[Echo.Pong]() pinger ! Echo.Ping("hello", probe.ref) probe.expectMessage(Echo.Pong("hello")) } } }
- Java
-
source
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; import org.apache.pekko.actor.typed.ActorRef; import org.junit.ClassRule; import org.junit.Test; public class JunitIntegrationExampleTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); @Test public void testSomething() { ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping"); TestProbe<Echo.Pong> probe = testKit.createTestProbe(); pinger.tell(new Echo.Ping("hello", probe.ref())); probe.expectMessage(new Echo.Pong("hello")); } }
As you may have noticed ScalaTestWithActorTestKit
is an abstract class which means its problematic if you want treat a given test suite as a value and extend it in multiple ways (i.e. as an example you happen to be using testcontainers-scala and hypothetically you want to extend the same test for each different type of database you support).
If you find yourself in this situation you can instead define your tests within a trait that extends ScalaTestWithActorTestKitBase
. Since this is a trait you can then have different classes which extend this along with ScalaTestWithActorTestKit
.
- Scala
-
source
import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKitBase import org.scalatest.wordspec.AnyWordSpecLike trait ExtendTestMultipleTimes extends ScalaTestWithActorTestKitBase with AnyWordSpecLike with LogCapturing { "ScalaTestWithActorTestKitBase" must { "behave when extended in different ways" in { val pinger = testKit.spawn(Echo(), "ping") val probe = testKit.createTestProbe[Echo.Pong]() val message = this.getClass.getSimpleName pinger ! Echo.Ping(message, probe.ref) val returnedMessage = probe.expectMessage(Echo.Pong(message)) returnedMessage.message.contains("ExtendTestMultipleTimes") shouldBe false } } } class TestWithOneImplementation extends ScalaTestWithActorTestKit with ExtendTestMultipleTimes class TestWithAnotherImplementation extends ScalaTestWithActorTestKit with ExtendTestMultipleTimes
Configuration
By default the ActorTestKit
loads configuration from application-test.conf
if that exists, otherwise it is using default configuration from the reference.conf resources that ship with the Pekko libraries. The application.conf of your project is not used in this case. A specific configuration can be given as parameter when creating the TestKit.
If you prefer to use application.conf
you can pass that as the configuration parameter to the TestKit. It’s loaded with:
- Scala
-
source
import com.typesafe.config.ConfigFactory ConfigFactory.load()
- Java
-
source
import com.typesafe.config.ConfigFactory; ConfigFactory.load()
It’s often convenient to define configuration for a specific test as a String
in the test itself and use that as the configuration parameter to the TestKit. ConfigFactory.parseString
can be used for that:
- Scala
-
source
ConfigFactory.parseString(""" pekko.loglevel = DEBUG pekko.log-config-on-start = on """)
- Java
-
source
ConfigFactory.parseString("pekko.loglevel = DEBUG \n" + "pekko.log-config-on-start = on \n")
Combining those approaches using withFallback
:
- Scala
-
source
ConfigFactory.parseString(""" pekko.loglevel = DEBUG pekko.log-config-on-start = on """).withFallback(ConfigFactory.load())
- Java
-
source
ConfigFactory.parseString("pekko.loglevel = DEBUG \n" + "pekko.log-config-on-start = on \n") .withFallback(ConfigFactory.load())
More information can be found in the documentation of the configuration library.
Note that reference.conf
files are intended for libraries to define default values and shouldn’t be used in an application. It’s not supported to override a config property owned by one library in a reference.conf
of another library.
Controlling the scheduler
It can be hard to reliably unit test specific scenario’s when your actor relies on timing: especially when running many tests in parallel it can be hard to get the timing just right. Making such tests more reliable by using generous timeouts make the tests take a long time to run.
For such situations, we provide a scheduler where you can manually, explicitly advance the clock.
- Scala
-
source
import scala.concurrent.duration._ import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.testkit.typed.scaladsl.ManualTime import pekko.actor.testkit.typed.scaladsl.TestProbe import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.typed.scaladsl.Behaviors import org.scalatest.wordspec.AnyWordSpecLike class ManualTimerExampleSpec extends ScalaTestWithActorTestKit(ManualTime.config) with AnyWordSpecLike with LogCapturing { val manualTime: ManualTime = ManualTime() "A timer" must { "schedule non-repeated ticks" in { case object Tick case object Tock val probe = TestProbe[Tock.type]() val behavior = Behaviors.withTimers[Tick.type] { timer => timer.startSingleTimer(Tick, 10.millis) Behaviors.receiveMessage { _ => probe.ref ! Tock Behaviors.same } } spawn(behavior) manualTime.expectNoMessageFor(9.millis, probe) manualTime.timePasses(2.millis) probe.expectMessage(Tock) manualTime.expectNoMessageFor(10.seconds, probe) } } }
- Java
-
source
import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.testkit.typed.javadsl.ManualTime; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.junit.ClassRule; import org.junit.Rule; import org.scalatestplus.junit.JUnitSuite; import java.time.Duration; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.junit.Test; import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; public class ManualTimerExampleTest extends JUnitSuite { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(ManualTime.config()); @Rule public final LogCapturing logCapturing = new LogCapturing(); private final ManualTime manualTime = ManualTime.get(testKit.system()); static final class Tick { private Tick() {} static final Tick INSTANCE = new Tick(); } static final class Tock {} @Test public void testScheduleNonRepeatedTicks() { TestProbe<Tock> probe = testKit.createTestProbe(); Behavior<Tick> behavior = Behaviors.withTimers( timer -> { timer.startSingleTimer(Tick.INSTANCE, Duration.ofMillis(10)); return Behaviors.receiveMessage( tick -> { probe.ref().tell(new Tock()); return Behaviors.same(); }); }); testKit.spawn(behavior); manualTime.expectNoMessageFor(Duration.ofMillis(9), probe); manualTime.timePasses(Duration.ofMillis(2)); probe.expectMessageClass(Tock.class); manualTime.expectNoMessageFor(Duration.ofSeconds(10), probe); } }
Test of logging
To verify that certain logging events are emitted there is a utility called LoggingTestKit
LoggingTestKit
. You define a criteria of the expected logging events and it will assert that the given number of occurrences of matching logging events are emitted within a block of code.
The LoggingTestKit
LoggingTestKit
implementation requires Logback dependency.
For example, a criteria that verifies that an INFO
level event with a message containing “Received message” is logged:
- Scala
-
source
import org.apache.pekko.actor.testkit.typed.scaladsl.LoggingTestKit // implicit ActorSystem is needed, but that is given by ScalaTestWithActorTestKit // implicit val system: ActorSystem[_] LoggingTestKit.info("Received message").expect { ref ! Message("hello") }
- Java
-
source
import org.apache.pekko.actor.testkit.typed.javadsl.LoggingTestKit; LoggingTestKit.info("Received message") .expect( system, () -> { ref.tell(new Message("hello")); return null; });
More advanced criteria can be built by chaining conditions that all must be satisfied for a matching event.
- Scala
-
source
LoggingTestKit .error[IllegalArgumentException] .withMessageRegex(".*was rejected.*expecting ascii input.*") .withCustom { event => event.marker match { case Some(m) => m.getName == "validation" case None => false } } .withOccurrences(2) .expect { ref ! Message("hellö") ref ! Message("hejdå") }
- Java
-
source
LoggingTestKit.error(IllegalArgumentException.class) .withMessageRegex(".*was rejected.*expecting ascii input.*") .withCustom( event -> event.getMarker().isPresent() && event.getMarker().get().getName().equals("validation")) .withOccurrences(2) .expect( system, () -> { ref.tell(new Message("hellö")); ref.tell(new Message("hejdå")); return null; });
See LoggingTestKit
LoggingTestKit
for more details.
Silence logging output from tests
When running tests, it’s typically preferred to have the output to standard out, together with the output from the testing framework (ScalaTestJUnit). On one hand you want the output to be clean without logging noise, but on the other hand you want as much information as possible if there is a test failure (for example in CI builds).
The Pekko TestKit provides a LogCapturing
utility to support this with ScalaTest or JUnit. It will buffer log events instead of emitting them to the ConsoleAppender
immediately (or whatever Logback appender that is configured). When there is a test failure the buffered events are flushed to the target appenders, typically a ConsoleAppender
.
The LogCapturing
utility requires Logback dependency.
Mix LogCapturing
trait into the ScalaTestAdd a LogCapturing
@Rule
in the JUnit test like this:
- Scala
-
source
import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit class LogCapturingExampleSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { "Something" must { "behave correctly" in { val pinger = testKit.spawn(Echo(), "ping") val probe = testKit.createTestProbe[Echo.Pong]() pinger ! Echo.Ping("hello", probe.ref) probe.expectMessage(Echo.Pong("hello")) } } }
- Java
-
source
import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; import org.apache.pekko.actor.typed.ActorRef; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; public class LogCapturingExampleTest { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); @Rule public final LogCapturing logCapturing = new LogCapturing(); @Test public void testSomething() { ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping"); TestProbe<Echo.Pong> probe = testKit.createTestProbe(); pinger.tell(new Echo.Ping("hello", probe.ref())); probe.expectMessage(new Echo.Pong("hello")); } }
Then you also need to configure the CapturingAppender
and CapturingAppenderDelegate
in src/test/resources/logback-test.xml
:
source<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>
<!--
Logging from tests are silenced by this appender. When there is a test failure
the captured logging events are flushed to the appenders defined for the
org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
-->
<appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
<!--
The appenders defined for this CapturingAppenderDelegate logger are used
when there is a test failure and all logging events from the test are
flushed to these appenders.
-->
<logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
<appender-ref ref="STDOUT"/>
</logger>
<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
</root>
</configuration>