Testing
To simplify testing of streaming integrations with Apache Pekko Connectors Kafka, it provides the Apache Pekko Connectors Kafka testkit. It provides help for
- Using Docker to launch a local Kafka cluster with testcontainers
- Mocking the Apache Pekko Connectors Kafka Consumers and Producers
Project Info: Apache Pekko Connectors Kafka testkit | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-kafka-testkit
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 |
Scala versions | 2.12.20, 2.13.15, 3.3.4 |
JPMS module name | pekko.stream.connectors.kafka.testkit |
License | |
Home page | https://pekko.apache.org/docs/pekko-connectors-kafka/current/ |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors-kafka |
- Maven
<properties> <pekko.version>1.1.1</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-kafka-testkit_${scala.binary.version}</artifactId> <version>1.1.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-testkit_${scala.binary.version}</artifactId> <version>${pekko.version}</version> <scope>test</scope> </dependency> </dependencies>
- sbt
val PekkoVersion = "1.1.1" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-kafka-testkit" % "1.1.0" % Test, "org.apache.pekko" %% "pekko-stream-testkit" % PekkoVersion % Test )
- Gradle
def versions = [ PekkoVersion: "1.1.1", ScalaBinary: "2.13" ] dependencies { testImplementation "org.apache.pekko:pekko-connectors-kafka-testkit_${versions.ScalaBinary}:1.1.0" testImplementation "org.apache.pekko:pekko-stream-testkit_${versions.ScalaBinary}:${versions.PekkoVersion}" }
Note that Apache Pekko testkits do not promise binary compatibility. The API might be changed even between patch releases.
The table below shows Apache Pekko Connectors Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively.
Running Kafka with your tests
The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster using Testcontainers (Docker).
The table below helps guide you to the right Testkit implementation depending on your programming language, testing framework, and use (or not) of Docker containers. You must mix in or implement these types into your test classes to use them. See the documentation for each for more details.
Type | Test Framework | Cluster | Lang | Lifetime |
---|---|---|---|---|
org.apache.pekko.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test |
JUnit 4 | Yes | Java | All tests, Per class |
org.apache.pekko.kafka.testkit.javadsl.TestcontainersKafkaTest |
JUnit 5 | Yes | Java | All tests, Per class |
org.apache.pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike |
ScalaTest | Yes | Scala | All tests |
org.apache.pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike |
ScalaTest | Yes | Scala | Per class |
Alternative testing libraries
If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.
Mocking the Consumer or Producer
The testkit contains factories to create the messages emitted by Consumer sources in org.apache.pekko.kafka.testkit.ConsumerResultFactory
and Producer flows in org.apache.pekko.kafka.testkit.ProducerResultFactory
.
To create the materialized value of Consumer sources, ConsumerControlFactory
ConsumerControlFactory
offers a wrapped KillSwitch
KillSwitch
.
- Scala
-
source
import org.apache.pekko import pekko.kafka.testkit.scaladsl.ConsumerControlFactory import pekko.kafka.testkit.{ ConsumerResultFactory, ProducerResultFactory } // create elements emitted by the mocked Consumer val elements = (0 to 10).map { i => val nextOffset = startOffset + i ConsumerResultFactory.committableMessage( new ConsumerRecord(topic, partition, nextOffset, "key", s"value $i"), ConsumerResultFactory.committableOffset(groupId, topic, partition, nextOffset, s"metadata $i")) } // create a source imitating the Consumer.committableSource val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] = Source(elements).viaMat(ConsumerControlFactory.controlFlow())(Keep.right) // create a source imitating the Producer.flexiFlow val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset], ProducerMessage.Results[String, String, CommittableOffset], NotUsed] = Flow[ProducerMessage.Envelope[String, String, CommittableOffset]] .map { case msg: ProducerMessage.Message[String, String, CommittableOffset] => ProducerResultFactory.result(msg) case other => throw new Exception(s"excluded: $other") } // run the flow as if it was connected to a Kafka broker val (control, streamCompletion) = mockedKafkaConsumerSource .map(msg => ProducerMessage.Message( new ProducerRecord[String, String](targetTopic, msg.record.value), msg.committableOffset)) .via(mockedKafkaProducerFlow) .map(_.passThrough) .toMat(Committer.sink(committerSettings))(Keep.both) .run()
- Java
-
source
import org.apache.pekko.kafka.testkit.ConsumerResultFactory; import org.apache.pekko.kafka.testkit.ProducerResultFactory; import org.apache.pekko.kafka.testkit.javadsl.ConsumerControlFactory; // create elements emitted by the mocked Consumer List<ConsumerMessage.CommittableMessage<String, String>> elements = Arrays.asList( ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset, "metadata")), ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset + 1, "metadata 2"))); // create a source imitating the Consumer.committableSource Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> mockedKafkaConsumerSource = Source.cycle(elements::iterator) .viaMat(ConsumerControlFactory.controlFlow(), Keep.right()); // create a source imitating the Producer.flexiFlow Flow< ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>, ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>, NotUsed> mockedKafkaProducerFlow = Flow .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>> create() .map( msg -> { if (msg instanceof ProducerMessage.Message) { ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset> msg2 = (ProducerMessage.Message< String, String, ConsumerMessage.CommittableOffset>) msg; return ProducerResultFactory.result(msg2); } else throw new RuntimeException("unexpected element: " + msg); }); // run the flow as if it was connected to a Kafka broker Pair<Consumer.Control, CompletionStage<Done>> stream = mockedKafkaConsumerSource .map( msg -> { ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset> message = new ProducerMessage.Message<>( new ProducerRecord<>( targetTopic, msg.record().key(), msg.record().value()), msg.committableOffset()); return message; }) .via(mockedKafkaProducerFlow) .map(ProducerMessage.Results::passThrough) .toMat(Committer.sink(committerSettings), Keep.both()) .run(sys);