Service discovery
By using Pekko Discovery Apache Pekko Connectors Kafka may read the Kafka bootstrap server addresses from any Apache Pekko Discovery-compatible service discovery mechanism.
Apache Pekko Discovery supports Configuration (HOCON), DNS (SRV records), and aggregation of multiple discovery methods out-of-the-box. Kubernetes API, AWS API: EC2 Tag-Based Discovery, AWS API: ECS Discovery and Consul implementations for Apache Pekko Discovery are available in Pekko Management.
Dependency
The Apache Pekko Discovery version must match the Apache Pekko version used in your build. To use the implementations provided by Apache Pekko Management, you need to add the desired dependency.
- Maven
<properties> <pekko.version>1.1.1</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-discovery_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- sbt
val PekkoVersion = "1.1.1" libraryDependencies += "org.apache.pekko" %% "pekko-discovery" % PekkoVersion
- Gradle
def versions = [ PekkoVersion: "1.1.1", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-discovery_${versions.ScalaBinary}:${versions.PekkoVersion}" }
Configure consumer settings
To use Apache Pekko Discovery with Apache Pekko Connectors Kafka consumers, configure a section for your consumer settings which inherits the default settings (by using ${pekko.kafka.consumer}
) and add a service name and a timeout for the service lookup. Setting the service-name
in the pekko.kafka.consumer
config will work, if all your consumers connect to the same Kafka broker.
The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout
depending on the discovery technology used, and your environment.
Note that consumers and producers may share a service (as shown in the examples on this page).
- application.conf
-
discovery-consumer: ${pekko.kafka.consumer} { service-name = "kafkaService1" }
Mount the DiscoverySupport
DiscoverySupport
in your consumer settings:
- Scala
-
source
import org.apache.pekko.kafka.scaladsl.DiscoverySupport val consumerConfig = config.getConfig("discovery-consumer") val settings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer) .withEnrichAsync(DiscoverySupport.consumerBootstrapServers(consumerConfig))
- Java
-
source
import org.apache.pekko.kafka.javadsl.DiscoverySupport; Config consumerConfig = system.settings().config().getConfig("discovery-consumer"); ConsumerSettings<String, String> settings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new StringDeserializer()) .withEnrichCompletionStage( DiscoverySupport.consumerBootstrapServers(consumerConfig, system));
Configure producer settings
To use Apache Pekko Discovery with Apache Pekko Connectors Kafka producers, configure a section for your producer settings which inherits the default settings (by using ${pekko.kafka.producer}
) and add a service name and a timeout for the service lookup. Setting the service-name
in the pekko.kafka.producer
config will work, if all your producers connect to the same Kafka broker.
The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout
depending on the discovery technology used, and your environment.
Note that consumers and producers may share a service (as shown in the examples on this page).
- application.conf
-
discovery-producer: ${pekko.kafka.producer} { service-name = "kafkaService1" }
Mount the DiscoverySupport
DiscoverySupport
in your producer settings:
- Scala
-
source
import org.apache.pekko.kafka.scaladsl.DiscoverySupport val producerConfig = config.getConfig("discovery-producer") val settings = ProducerSettings(producerConfig, new StringSerializer, new StringSerializer) .withEnrichAsync(DiscoverySupport.producerBootstrapServers(producerConfig))
- Java
-
source
import org.apache.pekko.kafka.javadsl.DiscoverySupport; Config producerConfig = system.settings().config().getConfig("discovery-producer"); ProducerSettings<String, String> settings = ProducerSettings.create(producerConfig, new StringSerializer(), new StringSerializer()) .withEnrichCompletionStage( DiscoverySupport.producerBootstrapServers(producerConfig, system));
Provide a service name via environment variables
To set the service name for lookup of the Kafka brokers bootstrap addresses via environment variables, use the built-in s support in Typesafe Config as below. This example will use the value from the environment variable KAFKA_SERVICE_NAME
and in case that is not defined default to kafkaServiceDefault
.
- application.conf
-
pekko.kafka.producer { service-name = "kafkaServiceDefault" service-name = ${?KAFKA_SERVICE_NAME} } pekko.kafka.consumer { service-name = "kafkaServiceDefault" service-name = ${?KAFKA_SERVICE_NAME} }
Specify a different service discovery mechanism
The Actor System-wide service discovery is used by default, to choose a different Apache Pekko Discovery implementation, set the discovery-method
setting in the producer and consumer configurations accordingly.
- application.conf
-
discovery-producer: ${pekko.kafka.producer} { discovery-method = "kubernetes-api" service-name = "kafkaService1" resolve-timeout = 3 seconds }
Use Config (HOCON) to describe the bootstrap servers
The setup below uses the built-in Apache Pekko Discovery implementation reading from Config (HOCON) files. That might be a good choice for development and testing. You may use the Aggregate implementation to first use another discovery technology, before falling back to the config file.
- application.conf
-
source
pekko.discovery.method = config pekko.discovery.config.services = { kafkaService1 = { endpoints = [ { host = "cat", port = 1233 } { host = "dog", port = 1234 } ] } }