Asynchronous testing¶
You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Testing.
Asynchronous testing uses a real ActorSystem
that allows you to test your Actors in a more realistic environment.
The minimal setup consists of the test procedure, which provides the desired stimuli, the actor under test, and an actor receiving replies. Bigger systems replace the actor under test with a network of actors, apply stimuli at varying injection points and arrange results to be sent from different emission points, but the basic principle stays the same in that a single procedure drives the test.
Basic example¶
Actor under test:
sourceobject Echo {
case class Ping(message: String, response: ActorRef[Pong])
case class Pong(message: String)
def apply(): Behavior[Ping] = Behaviors.receiveMessage {
case Ping(m, replyTo) =>
replyTo ! Pong(m)
Behaviors.same
}
}
sourcepublic static class Echo {
public static class Ping {
public final String message;
public final ActorRef<Pong> replyTo;
public Ping(String message, ActorRef<Pong> replyTo) {
this.message = message;
this.replyTo = replyTo;
}
}
public static class Pong {
public final String message;
public Pong(String message) {
this.message = message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Pong)) return false;
Pong pong = (Pong) o;
return message.equals(pong.message);
}
@Override
public int hashCode() {
return Objects.hash(message);
}
}
public static Behavior<Ping> create() {
return Behaviors.receive(Ping.class)
.onMessage(
Ping.class,
ping -> {
ping.replyTo.tell(new Pong(ping.message));
return Behaviors.same();
})
.build();
}
}
Tests create an instance of ActorTestKit
. This provides access to:
- An ActorSystem
- Methods for spawning Actors. These are created under the special testkit user guardian
- A method to shut down the ActorSystem from the test suite
This first example is using the “raw” ActorTestKit
but if you are using ScalaTest you can simplify the tests by using the Test framework integration. It’s still good to read this section to understand how it works.
sourceimport org.apache.pekko.actor.testkit.typed.scaladsl.ActorTestKit
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
class AsyncTestingExampleSpec
extends AnyWordSpec
with BeforeAndAfterAll
with Matchers {
val testKit = ActorTestKit()
}
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit;
public class AsyncTestingExampleTest
{
static final ActorTestKit testKit = ActorTestKit.create();
}
Your test is responsible for shutting down the ActorSystem
e.g. using BeforeAndAfterAll
when using ScalaTest .
sourceoverride def afterAll(): Unit = testKit.shutdownTestKit()
source@AfterClass
public static void cleanup() {
testKit.shutdownTestKit();
}
The following demonstrates:
- Creating an actor from the
TestKit
’s system usingspawn
- Creating a
TestProbe
- Verifying that the actor under test responds via the
TestProbe
Note that it is possible to use a TestProbe
directly as a RecipientRef
(a common supertype of ActorRef
and Cluster Sharding EntityRef
), in cases where a message protocol uses RecipientRef
instead of specifying ActorRef
or EntityRef
.
sourceval pinger = testKit.spawn(Echo(), "ping")
val probe = testKit.createTestProbe[Echo.Pong]()
pinger ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
sourceActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping");
TestProbe<Echo.Pong> probe = testKit.createTestProbe();
pinger.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));
Actors can also be spawned anonymously:
sourceval pinger = testKit.spawn(Echo())
sourceActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create());
Note that you can add import testKit._
to get access to the spawn
and createTestProbe
methods at the top level without prefixing them with testKit
.
Stopping actors¶
The method will wait until the actor stops or throw an assertion error in case of a timeout.
sourceval pinger1 = testKit.spawn(Echo(), "pinger")
pinger1 ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
testKit.stop(pinger1) // Uses default timeout
// Immediately creating an actor with the same name
val pinger2 = testKit.spawn(Echo(), "pinger")
pinger2 ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
testKit.stop(pinger2, 10.seconds) // Custom timeout
sourceActorRef<Echo.Ping> pinger1 = testKit.spawn(Echo.create(), "pinger");
pinger1.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));
testKit.stop(pinger1);
// Immediately creating an actor with the same name
ActorRef<Echo.Ping> pinger2 = testKit.spawn(Echo.create(), "pinger");
pinger2.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));
testKit.stop(pinger2, Duration.ofSeconds(10));
The stop
method can only be used for actors that were spawned by the same ActorTestKit
. Other actors will not be stopped by that method.
Observing mocked behavior¶
When testing a component (which may be an actor or not) that interacts with other actors it can be useful to not have to run the other actors it depends on. Instead, you might want to create mock behaviors that accept and possibly respond to messages in the same way the other actor would do but without executing any actual logic. In addition to this it can also be useful to observe those interactions to assert that the component under test did send the expected messages. This allows the same kinds of tests as classic TestActor
/Autopilot
.
As an example, let’s assume we’d like to test the following component:
sourcecase class Message(i: Int, replyTo: ActorRef[Try[Int]])
class Producer(publisher: ActorRef[Message])(implicit scheduler: Scheduler) {
def produce(messages: Int)(implicit timeout: Timeout): Unit = {
(0 until messages).foreach(publish)
}
private def publish(i: Int)(implicit timeout: Timeout): Future[Try[Int]] = {
publisher.ask(ref => Message(i, ref))
}
}
source
static class Message {
int i;
ActorRef<Integer> replyTo;
Message(int i, ActorRef<Integer> replyTo) {
this.i = i;
this.replyTo = replyTo;
}
}
public static class Producer {
private Scheduler scheduler;
private ActorRef<Message> publisher;
Producer(Scheduler scheduler, ActorRef<Message> publisher) {
this.scheduler = scheduler;
this.publisher = publisher;
}
public void produce(int messages) {
IntStream.range(0, messages).forEach(this::publish);
}
private CompletionStage<Integer> publish(int i) {
return AskPattern.ask(
publisher,
(ActorRef<Integer> ref) -> new Message(i, ref),
Duration.ofSeconds(3),
scheduler);
}
}
In our test, we create a mocked publisher
actor. Additionally we use Behaviors.monitor
with a TestProbe
in order to be able to verify the interaction of the producer
with the publisher
:
sourceimport testKit._
// simulate the happy path
val mockedBehavior = Behaviors.receiveMessage[Message] { msg =>
msg.replyTo ! Success(msg.i)
Behaviors.same
}
val probe = testKit.createTestProbe[Message]()
val mockedPublisher = testKit.spawn(Behaviors.monitor(probe.ref, mockedBehavior))
// test our component
val producer = new Producer(mockedPublisher)
val messages = 3
producer.produce(messages)
// verify expected behavior
for (i <- 0 until messages) {
val msg = probe.expectMessageType[Message]
msg.i shouldBe i
}
source// simulate the happy path
Behavior<Message> mockedBehavior =
Behaviors.receiveMessage(
message -> {
message.replyTo.tell(message.i);
return Behaviors.same();
});
TestProbe<Message> probe = testKit.createTestProbe();
ActorRef<Message> mockedPublisher =
testKit.spawn(Behaviors.monitor(Message.class, probe.ref(), mockedBehavior));
// test our component
Producer producer = new Producer(testKit.scheduler(), mockedPublisher);
int messages = 3;
producer.produce(messages);
// verify expected behavior
IntStream.range(0, messages)
.forEach(
i -> {
Message msg = probe.expectMessageClass(Message.class);
assertEquals(i, msg.i);
});
Test framework integration¶
If you are using ScalaTest you can extend ScalaTestWithActorTestKit
to have the async test kit automatically shutdown when the test is complete. This is done in afterAll
from the BeforeAndAfterAll
trait. If you override that method you should call super.afterAll
to shutdown the test kit.
Note that the dependency on ScalaTest is marked as optional from the test kit module, so your project must explicitly include a dependency on ScalaTest to use this.
sourceimport pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.wordspec.AnyWordSpecLike
class ScalaTestIntegrationExampleSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
"Something" must {
"behave correctly" in {
val pinger = testKit.spawn(Echo(), "ping")
val probe = testKit.createTestProbe[Echo.Pong]()
pinger ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
}
}
}
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
import org.apache.pekko.actor.typed.ActorRef;
import org.junit.ClassRule;
import org.junit.Test;
public class JunitIntegrationExampleTest {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
@Test
public void testSomething() {
ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping");
TestProbe<Echo.Pong> probe = testKit.createTestProbe();
pinger.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));
}
}
As you may have noticed ScalaTestWithActorTestKit
is an abstract class which means its problematic if you want treat a given test suite as a value and extend it in multiple ways (i.e. as an example you happen to be using testcontainers-scala and hypothetically you want to extend the same test for each different type of database you support).
If you find yourself in this situation you can instead define your tests within a trait that extends ScalaTestWithActorTestKitBase
. Since this is a trait you can then have different classes which extend this along with ScalaTestWithActorTestKit
.
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKitBase
import org.scalatest.wordspec.AnyWordSpecLike
trait ExtendTestMultipleTimes extends ScalaTestWithActorTestKitBase with AnyWordSpecLike with LogCapturing {
"ScalaTestWithActorTestKitBase" must {
"behave when extended in different ways" in {
val pinger = testKit.spawn(Echo(), "ping")
val probe = testKit.createTestProbe[Echo.Pong]()
val message = this.getClass.getSimpleName
pinger ! Echo.Ping(message, probe.ref)
val returnedMessage = probe.expectMessage(Echo.Pong(message))
returnedMessage.message.contains("ExtendTestMultipleTimes") shouldBe false
}
}
}
class TestWithOneImplementation extends ScalaTestWithActorTestKit with ExtendTestMultipleTimes
class TestWithAnotherImplementation extends ScalaTestWithActorTestKit with ExtendTestMultipleTimes
Configuration¶
By default the ActorTestKit
loads configuration from application-test.conf
if that exists, otherwise it is using default configuration from the reference.conf resources that ship with the Pekko libraries. The application.conf of your project is not used in this case. A specific configuration can be given as parameter when creating the TestKit.
If you prefer to use application.conf
you can pass that as the configuration parameter to the TestKit. It’s loaded with:
sourceimport com.typesafe.config.ConfigFactory
ConfigFactory.load()
sourceimport com.typesafe.config.ConfigFactory;
ConfigFactory.load()
It’s often convenient to define configuration for a specific test as a String
in the test itself and use that as the configuration parameter to the TestKit. ConfigFactory.parseString
can be used for that:
sourceConfigFactory.parseString("""
pekko.loglevel = DEBUG
pekko.log-config-on-start = on
""")
sourceConfigFactory.parseString("pekko.loglevel = DEBUG \n" + "pekko.log-config-on-start = on \n")
Combining those approaches using withFallback
:
sourceConfigFactory.parseString("""
pekko.loglevel = DEBUG
pekko.log-config-on-start = on
""").withFallback(ConfigFactory.load())
sourceConfigFactory.parseString("pekko.loglevel = DEBUG \n" + "pekko.log-config-on-start = on \n")
.withFallback(ConfigFactory.load())
More information can be found in the documentation of the configuration library.
Note that reference.conf
files are intended for libraries to define default values and shouldn’t be used in an application. It’s not supported to override a config property owned by one library in a reference.conf
of another library.
Controlling the scheduler¶
It can be hard to reliably unit test specific scenario’s when your actor relies on timing: especially when running many tests in parallel it can be hard to get the timing just right. Making such tests more reliable by using generous timeouts make the tests take a long time to run.
For such situations, we provide a scheduler where you can manually, explicitly advance the clock.
sourceimport scala.concurrent.duration._
import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.ManualTime
import pekko.actor.testkit.typed.scaladsl.TestProbe
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.typed.scaladsl.Behaviors
import org.scalatest.wordspec.AnyWordSpecLike
class ManualTimerExampleSpec
extends ScalaTestWithActorTestKit(ManualTime.config)
with AnyWordSpecLike
with LogCapturing {
val manualTime: ManualTime = ManualTime()
"A timer" must {
"schedule non-repeated ticks" in {
case object Tick
case object Tock
val probe = TestProbe[Tock.type]()
val behavior = Behaviors.withTimers[Tick.type] { timer =>
timer.startSingleTimer(Tick, 10.millis)
Behaviors.receiveMessage { _ =>
probe.ref ! Tock
Behaviors.same
}
}
spawn(behavior)
manualTime.expectNoMessageFor(9.millis, probe)
manualTime.timePasses(2.millis)
probe.expectMessage(Tock)
manualTime.expectNoMessageFor(10.seconds, probe)
}
}
}
source
import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.testkit.typed.javadsl.ManualTime;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.junit.ClassRule;
import org.junit.Rule;
import org.scalatestplus.junit.JUnitSuite;
import java.time.Duration;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.junit.Test;
import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
public class ManualTimerExampleTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(ManualTime.config());
@Rule public final LogCapturing logCapturing = new LogCapturing();
private final ManualTime manualTime = ManualTime.get(testKit.system());
static final class Tick {
private Tick() {}
static final Tick INSTANCE = new Tick();
}
static final class Tock {}
@Test
public void testScheduleNonRepeatedTicks() {
TestProbe<Tock> probe = testKit.createTestProbe();
Behavior<Tick> behavior =
Behaviors.withTimers(
timer -> {
timer.startSingleTimer(Tick.INSTANCE, Duration.ofMillis(10));
return Behaviors.receiveMessage(
tick -> {
probe.ref().tell(new Tock());
return Behaviors.same();
});
});
testKit.spawn(behavior);
manualTime.expectNoMessageFor(Duration.ofMillis(9), probe);
manualTime.timePasses(Duration.ofMillis(2));
probe.expectMessageClass(Tock.class);
manualTime.expectNoMessageFor(Duration.ofSeconds(10), probe);
}
}
Test of logging¶
To verify that certain logging events are emitted there is a utility called LoggingTestKit
. You define a criteria of the expected logging events and it will assert that the given number of occurrences of matching logging events are emitted within a block of code.
The LoggingTestKit
implementation requires Logback dependency.
For example, a criteria that verifies that an INFO
level event with a message containing “Received message” is logged:
sourceimport org.apache.pekko.actor.testkit.typed.scaladsl.LoggingTestKit
// implicit ActorSystem is needed, but that is given by ScalaTestWithActorTestKit
// implicit val system: ActorSystem[_]
LoggingTestKit.info("Received message").expect {
ref ! Message("hello")
}
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.LoggingTestKit;
LoggingTestKit.info("Received message")
.expect(
system,
() -> {
ref.tell(new Message("hello"));
return null;
});
More advanced criteria can be built by chaining conditions that all must be satisfied for a matching event.
sourceLoggingTestKit
.error[IllegalArgumentException]
.withMessageRegex(".*was rejected.*expecting ascii input.*")
.withCustom { event =>
event.marker match {
case Some(m) => m.getName == "validation"
case None => false
}
}
.withOccurrences(2)
.expect {
ref ! Message("hellö")
ref ! Message("hejdå")
}
sourceLoggingTestKit.error(IllegalArgumentException.class)
.withMessageRegex(".*was rejected.*expecting ascii input.*")
.withCustom(
event ->
event.getMarker().isPresent()
&& event.getMarker().get().getName().equals("validation"))
.withOccurrences(2)
.expect(
system,
() -> {
ref.tell(new Message("hellö"));
ref.tell(new Message("hejdå"));
return null;
});
See LoggingTestKit
for more details.
Silence logging output from tests¶
When running tests, it’s typically preferred to have the output to standard out, together with the output from the testing framework (ScalaTest ). On one hand you want the output to be clean without logging noise, but on the other hand you want as much information as possible if there is a test failure (for example in CI builds).
The Pekko TestKit provides a LogCapturing
utility to support this with ScalaTest or JUnit. It will buffer log events instead of emitting them to the ConsoleAppender
immediately (or whatever Logback appender that is configured). When there is a test failure the buffered events are flushed to the target appenders, typically a ConsoleAppender
.
The LogCapturing
utility requires Logback dependency.
Mix LogCapturing
trait into the ScalaTest like this:
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
class LogCapturingExampleSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
"Something" must {
"behave correctly" in {
val pinger = testKit.spawn(Echo(), "ping")
val probe = testKit.createTestProbe[Echo.Pong]()
pinger ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
}
}
}
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
import org.apache.pekko.actor.typed.ActorRef;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
public class LogCapturingExampleTest {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
@Rule public final LogCapturing logCapturing = new LogCapturing();
@Test
public void testSomething() {
ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping");
TestProbe<Echo.Pong> probe = testKit.createTestProbe();
pinger.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));
}
}
Then you also need to configure the CapturingAppender
and CapturingAppenderDelegate
in src/test/resources/logback-test.xml
:
source<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>
<!--
Logging from tests are silenced by this appender. When there is a test failure
the captured logging events are flushed to the appenders defined for the
org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
-->
<appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
<!--
The appenders defined for this CapturingAppenderDelegate logger are used
when there is a test failure and all logging events from the test are
flushed to these appenders.
-->
<logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
<appender-ref ref="STDOUT"/>
</logger>
<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
</root>
</configuration>