Testing

To simplify testing of streaming integrations with Apache Pekko Connectors Kafka, it provides the Apache Pekko Connectors Kafka testkit. It provides help for

Project Info: Apache Pekko Connectors Kafka testkit
Artifact
org.apache.pekko
pekko-connectors-kafka-testkit
1.1.0
JDK versions
OpenJDK 8
OpenJDK 11
Scala versions2.12.20, 2.13.15, 3.3.4
JPMS module namepekko.stream.connectors.kafka.testkit
License
Home pagehttps://pekko.apache.org/docs/pekko-connectors-kafka/current/
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors-kafka
Maven
<properties>
  <pekko.version>1.1.1</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-kafka-testkit_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream-testkit_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
    <scope>test</scope>
  </dependency>
</dependencies>
sbt
val PekkoVersion = "1.1.1"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-kafka-testkit" % "1.1.0" % Test,
  "org.apache.pekko" %% "pekko-stream-testkit" % PekkoVersion % Test
)
Gradle
def versions = [
  PekkoVersion: "1.1.1",
  ScalaBinary: "2.13"
]
dependencies {
  testImplementation "org.apache.pekko:pekko-connectors-kafka-testkit_${versions.ScalaBinary}:1.1.0"
  testImplementation "org.apache.pekko:pekko-stream-testkit_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

Note that Apache Pekko testkits do not promise binary compatibility. The API might be changed even between patch releases.

The table below shows Apache Pekko Connectors Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively.

Running Kafka with your tests

The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster using Testcontainers (Docker).

The table below helps guide you to the right Testkit implementation depending on your programming language, testing framework, and use (or not) of Docker containers. You must mix in or implement these types into your test classes to use them. See the documentation for each for more details.

Type Test Framework Cluster Lang Lifetime
org.apache.pekko.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test JUnit 4 Yes Java All tests, Per class
org.apache.pekko.kafka.testkit.javadsl.TestcontainersKafkaTest JUnit 5 Yes Java All tests, Per class
org.apache.pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike ScalaTest Yes Scala All tests
org.apache.pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike ScalaTest Yes Scala Per class

Alternative testing libraries

If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.

Mocking the Consumer or Producer

The testkit contains factories to create the messages emitted by Consumer sources in org.apache.pekko.kafka.testkit.ConsumerResultFactory and Producer flows in org.apache.pekko.kafka.testkit.ProducerResultFactory.

To create the materialized value of Consumer sources, ConsumerControlFactoryConsumerControlFactory offers a wrapped KillSwitchKillSwitch.

Scala
sourceimport org.apache.pekko
import pekko.kafka.testkit.scaladsl.ConsumerControlFactory
import pekko.kafka.testkit.{ ConsumerResultFactory, ProducerResultFactory }

// create elements emitted by the mocked Consumer
val elements = (0 to 10).map { i =>
  val nextOffset = startOffset + i
  ConsumerResultFactory.committableMessage(
    new ConsumerRecord(topic, partition, nextOffset, "key", s"value $i"),
    ConsumerResultFactory.committableOffset(groupId, topic, partition, nextOffset, s"metadata $i"))
}

// create a source imitating the Consumer.committableSource
val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] =
  Source(elements).viaMat(ConsumerControlFactory.controlFlow())(Keep.right)

// create a source imitating the Producer.flexiFlow
val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset],
  ProducerMessage.Results[String, String, CommittableOffset], NotUsed] =
  Flow[ProducerMessage.Envelope[String, String, CommittableOffset]]
    .map {
      case msg: ProducerMessage.Message[String, String, CommittableOffset] =>
        ProducerResultFactory.result(msg)
      case other => throw new Exception(s"excluded: $other")
    }

// run the flow as if it was connected to a Kafka broker
val (control, streamCompletion) = mockedKafkaConsumerSource
  .map(msg =>
    ProducerMessage.Message(
      new ProducerRecord[String, String](targetTopic, msg.record.value),
      msg.committableOffset))
  .via(mockedKafkaProducerFlow)
  .map(_.passThrough)
  .toMat(Committer.sink(committerSettings))(Keep.both)
  .run()
Java
sourceimport org.apache.pekko.kafka.testkit.ConsumerResultFactory;
import org.apache.pekko.kafka.testkit.ProducerResultFactory;
import org.apache.pekko.kafka.testkit.javadsl.ConsumerControlFactory;

// create elements emitted by the mocked Consumer
List<ConsumerMessage.CommittableMessage<String, String>> elements =
    Arrays.asList(
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset, "metadata")),
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset + 1, "metadata 2")));

// create a source imitating the Consumer.committableSource
Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>
    mockedKafkaConsumerSource =
        Source.cycle(elements::iterator)
            .viaMat(ConsumerControlFactory.controlFlow(), Keep.right());

// create a source imitating the Producer.flexiFlow
Flow<
        ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>,
        ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>,
        NotUsed>
    mockedKafkaProducerFlow =
        Flow
            .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>>
                create()
            .map(
                msg -> {
                  if (msg instanceof ProducerMessage.Message) {
                    ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset>
                        msg2 =
                            (ProducerMessage.Message<
                                    String, String, ConsumerMessage.CommittableOffset>)
                                msg;
                    return ProducerResultFactory.result(msg2);
                  } else throw new RuntimeException("unexpected element: " + msg);
                });

// run the flow as if it was connected to a Kafka broker
Pair<Consumer.Control, CompletionStage<Done>> stream =
    mockedKafkaConsumerSource
        .map(
            msg -> {
              ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>
                  message =
                      new ProducerMessage.Message<>(
                          new ProducerRecord<>(
                              targetTopic, msg.record().key(), msg.record().value()),
                          msg.committableOffset());
              return message;
            })
        .via(mockedKafkaProducerFlow)
        .map(ProducerMessage.Results::passThrough)
        .toMat(Committer.sink(committerSettings), Keep.both())
        .run(sys);