Asynchronous testing

You are viewing the documentation for the new actor APIs, to view the Pekko Classic documentation, see Classic Testing.

Asynchronous testing uses a real ActorSystemActorSystem that allows you to test your Actors in a more realistic environment.

The minimal setup consists of the test procedure, which provides the desired stimuli, the actor under test, and an actor receiving replies. Bigger systems replace the actor under test with a network of actors, apply stimuli at varying injection points and arrange results to be sent from different emission points, but the basic principle stays the same in that a single procedure drives the test.

Basic example

Actor under test:

Scala
sourceobject Echo {
  case class Ping(message: String, response: ActorRef[Pong])
  case class Pong(message: String)

  def apply(): Behavior[Ping] = Behaviors.receiveMessage {
    case Ping(m, replyTo) =>
      replyTo ! Pong(m)
      Behaviors.same
  }
}
Java
sourcepublic static class Echo {
  public static class Ping {
    public final String message;
    public final ActorRef<Pong> replyTo;

    public Ping(String message, ActorRef<Pong> replyTo) {
      this.message = message;
      this.replyTo = replyTo;
    }
  }

  public static class Pong {
    public final String message;

    public Pong(String message) {
      this.message = message;
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) return true;
      if (!(o instanceof Pong)) return false;
      Pong pong = (Pong) o;
      return message.equals(pong.message);
    }

    @Override
    public int hashCode() {
      return Objects.hash(message);
    }
  }

  public static Behavior<Ping> create() {
    return Behaviors.receive(Ping.class)
        .onMessage(
            Ping.class,
            ping -> {
              ping.replyTo.tell(new Pong(ping.message));
              return Behaviors.same();
            })
        .build();
  }
}

Tests create an instance of ActorTestKitActorTestKit. This provides access to:

  • An ActorSystem
  • Methods for spawning Actors. These are created under the special testkit user guardian
  • A method to shut down the ActorSystem from the test suite

This first example is using the “raw” ActorTestKit but if you are using ScalaTestJUnit you can simplify the tests by using the Test framework integration. It’s still good to read this section to understand how it works.

Scala
sourceimport org.apache.pekko.actor.testkit.typed.scaladsl.ActorTestKit

import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class AsyncTestingExampleSpec
    extends AnyWordSpec
    with BeforeAndAfterAll
    with Matchers {
  val testKit = ActorTestKit()
}
Java
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit;

public class AsyncTestingExampleTest
{
  static final ActorTestKit testKit = ActorTestKit.create();
}

Your test is responsible for shutting down the ActorSystemActorSystem e.g. using BeforeAndAfterAll when using ScalaTest@AfterClass when using JUnit.

Scala
sourceoverride def afterAll(): Unit = testKit.shutdownTestKit()
Java
source@AfterClass
public static void cleanup() {
  testKit.shutdownTestKit();
}

The following demonstrates:

  • Creating an actor from the TestKit’s system using spawn
  • Creating a TestProbe
  • Verifying that the actor under test responds via the TestProbe

Note that it is possible to use a TestProbe directly as a RecipientRefRecipientRef (a common supertype of ActorRef and Cluster Sharding EntityRef), in cases where a message protocol uses RecipientRef instead of specifying ActorRef or EntityRef.

Scala
sourceval pinger = testKit.spawn(Echo(), "ping")
val probe = testKit.createTestProbe[Echo.Pong]()
pinger ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
Java
sourceActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping");
TestProbe<Echo.Pong> probe = testKit.createTestProbe();
pinger.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));

Actors can also be spawned anonymously:

Scala
sourceval pinger = testKit.spawn(Echo())
Java
sourceActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create());

Note that you can add import testKit._ to get access to the spawn and createTestProbe methods at the top level without prefixing them with testKit.

Stopping actors

The method will wait until the actor stops or throw an assertion error in case of a timeout.

Scala
sourceval pinger1 = testKit.spawn(Echo(), "pinger")
pinger1 ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
testKit.stop(pinger1) // Uses default timeout

// Immediately creating an actor with the same name
val pinger2 = testKit.spawn(Echo(), "pinger")
pinger2 ! Echo.Ping("hello", probe.ref)
probe.expectMessage(Echo.Pong("hello"))
testKit.stop(pinger2, 10.seconds) // Custom timeout
Java
sourceActorRef<Echo.Ping> pinger1 = testKit.spawn(Echo.create(), "pinger");
pinger1.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));
testKit.stop(pinger1);

// Immediately creating an actor with the same name
ActorRef<Echo.Ping> pinger2 = testKit.spawn(Echo.create(), "pinger");
pinger2.tell(new Echo.Ping("hello", probe.ref()));
probe.expectMessage(new Echo.Pong("hello"));
testKit.stop(pinger2, Duration.ofSeconds(10));

The stop method can only be used for actors that were spawned by the same ActorTestKitActorTestKit. Other actors will not be stopped by that method.

Observing mocked behavior

When testing a component (which may be an actor or not) that interacts with other actors it can be useful to not have to run the other actors it depends on. Instead, you might want to create mock behaviors that accept and possibly respond to messages in the same way the other actor would do but without executing any actual logic. In addition to this it can also be useful to observe those interactions to assert that the component under test did send the expected messages. This allows the same kinds of tests as classic TestActor/Autopilot.

As an example, let’s assume we’d like to test the following component:

Scala
sourcecase class Message(i: Int, replyTo: ActorRef[Try[Int]])

class Producer(publisher: ActorRef[Message])(implicit scheduler: Scheduler) {

  def produce(messages: Int)(implicit timeout: Timeout): Unit = {
    (0 until messages).foreach(publish)
  }

  private def publish(i: Int)(implicit timeout: Timeout): Future[Try[Int]] = {
    publisher.ask(ref => Message(i, ref))
  }

}
Java
source
static class Message { int i; ActorRef<Integer> replyTo; Message(int i, ActorRef<Integer> replyTo) { this.i = i; this.replyTo = replyTo; } } public static class Producer { private Scheduler scheduler; private ActorRef<Message> publisher; Producer(Scheduler scheduler, ActorRef<Message> publisher) { this.scheduler = scheduler; this.publisher = publisher; } public void produce(int messages) { IntStream.range(0, messages).forEach(this::publish); } private CompletionStage<Integer> publish(int i) { return AskPattern.ask( publisher, (ActorRef<Integer> ref) -> new Message(i, ref), Duration.ofSeconds(3), scheduler); } }

In our test, we create a mocked publisher actor. Additionally we use Behaviors.monitor with a TestProbe in order to be able to verify the interaction of the producer with the publisher:

Scala
sourceimport testKit._

// simulate the happy path
val mockedBehavior = Behaviors.receiveMessage[Message] { msg =>
  msg.replyTo ! Success(msg.i)
  Behaviors.same
}
val probe = testKit.createTestProbe[Message]()
val mockedPublisher = testKit.spawn(Behaviors.monitor(probe.ref, mockedBehavior))

// test our component
val producer = new Producer(mockedPublisher)
val messages = 3
producer.produce(messages)

// verify expected behavior
for (i <- 0 until messages) {
  val msg = probe.expectMessageType[Message]
  msg.i shouldBe i
}
Java
source// simulate the happy path
Behavior<Message> mockedBehavior =
    Behaviors.receiveMessage(
        message -> {
          message.replyTo.tell(message.i);
          return Behaviors.same();
        });
TestProbe<Message> probe = testKit.createTestProbe();
ActorRef<Message> mockedPublisher =
    testKit.spawn(Behaviors.monitor(Message.class, probe.ref(), mockedBehavior));

// test our component
Producer producer = new Producer(testKit.scheduler(), mockedPublisher);
int messages = 3;
producer.produce(messages);

// verify expected behavior
IntStream.range(0, messages)
    .forEach(
        i -> {
          Message msg = probe.expectMessageClass(Message.class);
          assertEquals(i, msg.i);
        });

Test framework integration

If you are using JUnit you can use TestKitJunitResource to have the async test kit automatically shutdown when the test is complete.

Note that the dependency on JUnit is marked as optional from the test kit module, so your project must explicitly include a dependency on JUnit to use this.

If you are using ScalaTest you can extend ScalaTestWithActorTestKit to have the async test kit automatically shutdown when the test is complete. This is done in afterAll from the BeforeAndAfterAll trait. If you override that method you should call super.afterAll to shutdown the test kit.

Note that the dependency on ScalaTest is marked as optional from the test kit module, so your project must explicitly include a dependency on ScalaTest to use this.

Scala
sourceimport pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.wordspec.AnyWordSpecLike
class ScalaTestIntegrationExampleSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {

  "Something" must {
    "behave correctly" in {
      val pinger = testKit.spawn(Echo(), "ping")
      val probe = testKit.createTestProbe[Echo.Pong]()
      pinger ! Echo.Ping("hello", probe.ref)
      probe.expectMessage(Echo.Pong("hello"))
    }
  }
}
Java
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
import org.apache.pekko.actor.typed.ActorRef;
import org.junit.ClassRule;
import org.junit.Test;

public class JunitIntegrationExampleTest {

  @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();


  @Test
  public void testSomething() {
    ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping");
    TestProbe<Echo.Pong> probe = testKit.createTestProbe();
    pinger.tell(new Echo.Ping("hello", probe.ref()));
    probe.expectMessage(new Echo.Pong("hello"));
  }
}

As you may have noticed ScalaTestWithActorTestKit is an abstract class which means its problematic if you want treat a given test suite as a value and extend it in multiple ways (i.e. as an example you happen to be using testcontainers-scala and hypothetically you want to extend the same test for each different type of database you support).

If you find yourself in this situation you can instead define your tests within a trait that extends ScalaTestWithActorTestKitBase. Since this is a trait you can then have different classes which extend this along with ScalaTestWithActorTestKit.

Scala
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKitBase
import org.scalatest.wordspec.AnyWordSpecLike

trait ExtendTestMultipleTimes extends ScalaTestWithActorTestKitBase with AnyWordSpecLike with LogCapturing {
  "ScalaTestWithActorTestKitBase" must {
    "behave when extended in different ways" in {
      val pinger = testKit.spawn(Echo(), "ping")
      val probe = testKit.createTestProbe[Echo.Pong]()
      val message = this.getClass.getSimpleName
      pinger ! Echo.Ping(message, probe.ref)
      val returnedMessage = probe.expectMessage(Echo.Pong(message))
      returnedMessage.message.contains("ExtendTestMultipleTimes") shouldBe false
    }
  }

}

class TestWithOneImplementation extends ScalaTestWithActorTestKit with ExtendTestMultipleTimes
class TestWithAnotherImplementation extends ScalaTestWithActorTestKit with ExtendTestMultipleTimes

Configuration

By default the ActorTestKit loads configuration from application-test.conf if that exists, otherwise it is using default configuration from the reference.conf resources that ship with the Pekko libraries. The application.conf of your project is not used in this case. A specific configuration can be given as parameter when creating the TestKit.

If you prefer to use application.conf you can pass that as the configuration parameter to the TestKit. It’s loaded with:

Scala
sourceimport com.typesafe.config.ConfigFactory

ConfigFactory.load()
Java
sourceimport com.typesafe.config.ConfigFactory;

ConfigFactory.load()

It’s often convenient to define configuration for a specific test as a String in the test itself and use that as the configuration parameter to the TestKit. ConfigFactory.parseString can be used for that:

Scala
sourceConfigFactory.parseString("""
  pekko.loglevel = DEBUG
  pekko.log-config-on-start = on
  """)
Java
sourceConfigFactory.parseString("pekko.loglevel = DEBUG \n" + "pekko.log-config-on-start = on \n")

Combining those approaches using withFallback:

Scala
sourceConfigFactory.parseString("""
  pekko.loglevel = DEBUG
  pekko.log-config-on-start = on
  """).withFallback(ConfigFactory.load())
Java
sourceConfigFactory.parseString("pekko.loglevel = DEBUG \n" + "pekko.log-config-on-start = on \n")
    .withFallback(ConfigFactory.load())

More information can be found in the documentation of the configuration library.

Note

Note that reference.conf files are intended for libraries to define default values and shouldn’t be used in an application. It’s not supported to override a config property owned by one library in a reference.conf of another library.

Controlling the scheduler

It can be hard to reliably unit test specific scenario’s when your actor relies on timing: especially when running many tests in parallel it can be hard to get the timing just right. Making such tests more reliable by using generous timeouts make the tests take a long time to run.

For such situations, we provide a scheduler where you can manually, explicitly advance the clock.

Scala
sourceimport scala.concurrent.duration._
import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.ManualTime
import pekko.actor.testkit.typed.scaladsl.TestProbe
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.typed.scaladsl.Behaviors
import org.scalatest.wordspec.AnyWordSpecLike

class ManualTimerExampleSpec
    extends ScalaTestWithActorTestKit(ManualTime.config)
    with AnyWordSpecLike
    with LogCapturing {

  val manualTime: ManualTime = ManualTime()

  "A timer" must {
    "schedule non-repeated ticks" in {
      case object Tick
      case object Tock

      val probe = TestProbe[Tock.type]()
      val behavior = Behaviors.withTimers[Tick.type] { timer =>
        timer.startSingleTimer(Tick, 10.millis)
        Behaviors.receiveMessage { _ =>
          probe.ref ! Tock
          Behaviors.same
        }
      }

      spawn(behavior)

      manualTime.expectNoMessageFor(9.millis, probe)

      manualTime.timePasses(2.millis)
      probe.expectMessage(Tock)

      manualTime.expectNoMessageFor(10.seconds, probe)
    }
  }
}
Java
source
import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.testkit.typed.javadsl.ManualTime; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; import org.junit.ClassRule; import org.junit.Rule; import org.scalatestplus.junit.JUnitSuite; import java.time.Duration; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.junit.Test; import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; public class ManualTimerExampleTest extends JUnitSuite { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(ManualTime.config()); @Rule public final LogCapturing logCapturing = new LogCapturing(); private final ManualTime manualTime = ManualTime.get(testKit.system()); static final class Tick { private Tick() {} static final Tick INSTANCE = new Tick(); } static final class Tock {} @Test public void testScheduleNonRepeatedTicks() { TestProbe<Tock> probe = testKit.createTestProbe(); Behavior<Tick> behavior = Behaviors.withTimers( timer -> { timer.startSingleTimer(Tick.INSTANCE, Duration.ofMillis(10)); return Behaviors.receiveMessage( tick -> { probe.ref().tell(new Tock()); return Behaviors.same(); }); }); testKit.spawn(behavior); manualTime.expectNoMessageFor(Duration.ofMillis(9), probe); manualTime.timePasses(Duration.ofMillis(2)); probe.expectMessageClass(Tock.class); manualTime.expectNoMessageFor(Duration.ofSeconds(10), probe); } }

Test of logging

To verify that certain logging events are emitted there is a utility called LoggingTestKitLoggingTestKit . You define a criteria of the expected logging events and it will assert that the given number of occurrences of matching logging events are emitted within a block of code.

For example, a criteria that verifies that an INFO level event with a message containing “Received message” is logged:

Scala
sourceimport org.apache.pekko.actor.testkit.typed.scaladsl.LoggingTestKit

// implicit ActorSystem is needed, but that is given by ScalaTestWithActorTestKit
// implicit val system: ActorSystem[_]

LoggingTestKit.info("Received message").expect {
  ref ! Message("hello")
}
Java
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.LoggingTestKit;

LoggingTestKit.info("Received message")
    .expect(
        system,
        () -> {
          ref.tell(new Message("hello"));
          return null;
        });

More advanced criteria can be built by chaining conditions that all must be satisfied for a matching event.

Scala
sourceLoggingTestKit
  .error[IllegalArgumentException]
  .withMessageRegex(".*was rejected.*expecting ascii input.*")
  .withCustom { event =>
    event.marker match {
      case Some(m) => m.getName == "validation"
      case None    => false
    }
  }
  .withOccurrences(2)
  .expect {
    ref ! Message("hellö")
    ref ! Message("hejdå")
  }
Java
sourceLoggingTestKit.error(IllegalArgumentException.class)
    .withMessageRegex(".*was rejected.*expecting ascii input.*")
    .withCustom(
        event ->
            event.getMarker().isPresent()
                && event.getMarker().get().getName().equals("validation"))
    .withOccurrences(2)
    .expect(
        system,
        () -> {
          ref.tell(new Message("hellö"));
          ref.tell(new Message("hejdå"));
          return null;
        });

See LoggingTestKitLoggingTestKit for more details.

Silence logging output from tests

When running tests, it’s typically preferred to have the output to standard out, together with the output from the testing framework (ScalaTestJUnit). On one hand you want the output to be clean without logging noise, but on the other hand you want as much information as possible if there is a test failure (for example in CI builds).

The Pekko TestKit provides a LogCapturing utility to support this with ScalaTest or JUnit. It will buffer log events instead of emitting them to the ConsoleAppender immediately (or whatever Logback appender that is configured). When there is a test failure the buffered events are flushed to the target appenders, typically a ConsoleAppender.

Note

The LogCapturing utility requires Logback dependency.

Mix LogCapturing trait into the ScalaTestAdd a LogCapturing @Rule in the JUnit test like this:

Scala
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
class LogCapturingExampleSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {

  "Something" must {
    "behave correctly" in {
      val pinger = testKit.spawn(Echo(), "ping")
      val probe = testKit.createTestProbe[Echo.Pong]()
      pinger ! Echo.Ping("hello", probe.ref)
      probe.expectMessage(Echo.Pong("hello"))
    }
  }
}
Java
sourceimport org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
import org.apache.pekko.actor.typed.ActorRef;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class LogCapturingExampleTest {

  @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();

  @Rule public final LogCapturing logCapturing = new LogCapturing();

  @Test
  public void testSomething() {
    ActorRef<Echo.Ping> pinger = testKit.spawn(Echo.create(), "ping");
    TestProbe<Echo.Pong> probe = testKit.createTestProbe();
    pinger.tell(new Echo.Ping("hello", probe.ref()));
    probe.expectMessage(new Echo.Pong("hello"));
  }
}

Then you also need to configure the CapturingAppender and CapturingAppenderDelegate in src/test/resources/logback-test.xml:

source<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <encoder>
            <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
        </encoder>
    </appender>

    <!--
    Logging from tests are silenced by this appender. When there is a test failure
    the captured logging events are flushed to the appenders defined for the
    org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
    -->
    <appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />

    <!--
    The appenders defined for this CapturingAppenderDelegate logger are used
    when there is a test failure and all logging events from the test are
    flushed to these appenders.
    -->
    <logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
      <appender-ref ref="STDOUT"/>
    </logger>

    <root level="DEBUG">
        <appender-ref ref="CapturingAppender"/>
    </root>
</configuration>