Service discovery

By using Pekko Discovery Apache Pekko Connectors Kafka may read the Kafka bootstrap server addresses from any Apache Pekko Discovery-compatible service discovery mechanism.

Apache Pekko Discovery supports Configuration (HOCON), DNS (SRV records), and aggregation of multiple discovery methods out-of-the-box. Kubernetes API, AWS API: EC2 Tag-Based Discovery, AWS API: ECS Discovery and Consul implementations for Apache Pekko Discovery are available in Pekko Management.

Dependency

The Apache Pekko Discovery version must match the Apache Pekko version used in your build. To use the implementations provided by Apache Pekko Management, you need to add the desired dependency.

Maven
<properties>
  <pekko.version>1.0.2</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-discovery_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
sbt
val PekkoVersion = "1.0.2"
libraryDependencies += "org.apache.pekko" %% "pekko-discovery" % PekkoVersion
Gradle
def versions = [
  PekkoVersion: "1.0.2",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-discovery_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

Configure consumer settings

To use Apache Pekko Discovery with Apache Pekko Connectors Kafka consumers, configure a section for your consumer settings which inherits the default settings (by using ${pekko.kafka.consumer}) and add a service name and a timeout for the service lookup. Setting the service-name in the pekko.kafka.consumer config will work, if all your consumers connect to the same Kafka broker.

The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout depending on the discovery technology used, and your environment.

Note that consumers and producers may share a service (as shown in the examples on this page).

application.conf
discovery-consumer: ${pekko.kafka.consumer} {
  service-name = "kafkaService1"
}

Mount the DiscoverySupportDiscoverySupport in your consumer settings:

Scala
sourceimport org.apache.pekko.kafka.scaladsl.DiscoverySupport

val consumerConfig = config.getConfig("discovery-consumer")
val settings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
  .withEnrichAsync(DiscoverySupport.consumerBootstrapServers(consumerConfig))
Java
sourceimport org.apache.pekko.kafka.javadsl.DiscoverySupport;

Config consumerConfig = system.settings().config().getConfig("discovery-consumer");
ConsumerSettings<String, String> settings =
    ConsumerSettings.create(consumerConfig, new StringDeserializer(), new StringDeserializer())
        .withEnrichCompletionStage(
            DiscoverySupport.consumerBootstrapServers(consumerConfig, system));

Configure producer settings

To use Apache Pekko Discovery with Apache Pekko Connectors Kafka producers, configure a section for your producer settings which inherits the default settings (by using ${pekko.kafka.producer}) and add a service name and a timeout for the service lookup. Setting the service-name in the pekko.kafka.producer config will work, if all your producers connect to the same Kafka broker.

The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout depending on the discovery technology used, and your environment.

Note that consumers and producers may share a service (as shown in the examples on this page).

application.conf
discovery-producer: ${pekko.kafka.producer} {
  service-name = "kafkaService1"
}

Mount the DiscoverySupportDiscoverySupport in your producer settings:

Scala
sourceimport org.apache.pekko.kafka.scaladsl.DiscoverySupport

val producerConfig = config.getConfig("discovery-producer")
val settings = ProducerSettings(producerConfig, new StringSerializer, new StringSerializer)
  .withEnrichAsync(DiscoverySupport.producerBootstrapServers(producerConfig))
Java
sourceimport org.apache.pekko.kafka.javadsl.DiscoverySupport;

Config producerConfig = system.settings().config().getConfig("discovery-producer");
ProducerSettings<String, String> settings =
    ProducerSettings.create(producerConfig, new StringSerializer(), new StringSerializer())
        .withEnrichCompletionStage(
            DiscoverySupport.producerBootstrapServers(producerConfig, system));

Provide a service name via environment variables

To set the service name for lookup of the Kafka brokers bootstrap addresses via environment variables, use the built-in s support in Typesafe Config as below. This example will use the value from the environment variable KAFKA_SERVICE_NAME and in case that is not defined default to kafkaServiceDefault.

application.conf
pekko.kafka.producer {
  service-name = "kafkaServiceDefault"
  service-name = ${?KAFKA_SERVICE_NAME}
}

pekko.kafka.consumer {
  service-name = "kafkaServiceDefault"
  service-name = ${?KAFKA_SERVICE_NAME}
}

Specify a different service discovery mechanism

The Actor System-wide service discovery is used by default, to choose a different Apache Pekko Discovery implementation, set the discovery-method setting in the producer and consumer configurations accordingly.

application.conf
discovery-producer: ${pekko.kafka.producer} {
  discovery-method = "kubernetes-api"
  service-name = "kafkaService1"
  resolve-timeout = 3 seconds
}

Use Config (HOCON) to describe the bootstrap servers

The setup below uses the built-in Apache Pekko Discovery implementation reading from Config (HOCON) files. That might be a good choice for development and testing. You may use the Aggregate implementation to first use another discovery technology, before falling back to the config file.

application.conf
sourcepekko.discovery.method = config
pekko.discovery.config.services = {
  kafkaService1 = {
    endpoints = [
      { host = "cat", port = 1233 }
      { host = "dog", port = 1234 }
    ]
  }
}