Serialization

The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a map operation in the Apache Pekko Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Apache Pekko Stream, it is easier to implement the desired error handling strategy as the examples below show.

Protocol buffers

Protocol Buffers offer a language-neutral, platform-neutral, extensible mechanism for serializing structured data and allow consumers and producers to rely on the message format.

The easiest way to use Protocol Buffers with Apache Pekko Connectors Kafka is to serialize and deserialize the Kafka message payload as a byte array and call the Protocol Buffers serialization and deserialization in a regular map operator. To serialize the Protobuf-defined type Order into a byte array use the .toByteArray() method which gets generated by the Protobuf compiler.

Scala
source// the Protobuf generated class
import docs.scaladsl.proto.Order

val producerSettings: ProducerSettings[String, Array[Byte]] = // ...

val producerCompletion =
  Source(samples)
    .map(order => new ProducerRecord(topic, order.id, order.toByteArray))
    .runWith(Producer.plainSink(producerSettings))
Java
source// the Protobuf generated class
import docs.javadsl.proto.OrderMessages;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

ProducerSettings<String, byte[]> producerSettings = // ...

CompletionStage<Done> producerCompletion =
    Source.from(samples)
        .map(order -> new ProducerRecord<>(topic, order.getId(), order.toByteArray()))
        .runWith(Producer.plainSink(producerSettings), sys);

To de-serialize a Protocol Buffers message in a map operator, convert the received byte array to the designated type with the generated parseFrom() method.

This example uses resuming to react on data which can’t be deserialized and ignores faulty elements.

Scala
source// the Protobuf generated class
import docs.scaladsl.proto.Order

val resumeOnParsingException = ActorAttributes.supervisionStrategy {
  case _: com.google.protobuf.InvalidProtocolBufferException => Supervision.Resume
  case _                                                     => Supervision.stop
}

val consumerSettings: ConsumerSettings[String, Array[Byte]] = // ...

val consumer = Consumer
  .plainSource(consumerSettings, Subscriptions.topics(topic))
  .map { consumerRecord =>
    Order.parseFrom(consumerRecord.value())
  }
  .withAttributes(resumeOnParsingException)
  .toMat(Sink.seq)(DrainingControl.apply)
  .run()
Java
source// the Protobuf generated class
import docs.javadsl.proto.OrderMessages;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

final Attributes resumeOnParseException =
    ActorAttributes.withSupervisionStrategy(
        exception -> {
          if (exception instanceof com.google.protobuf.InvalidProtocolBufferException) {
            return Supervision.resume();
          } else {
            return Supervision.stop();
          }
        });

ConsumerSettings<String, byte[]> consumerSettings = // ...

Consumer.DrainingControl<List<OrderMessages.Order>> control =
    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
        .map(ConsumerRecord::value)
        .map(OrderMessages.Order::parseFrom)
        .withAttributes(resumeOnParseException) // drop faulty elements
        .toMat(Sink.seq(), Consumer::createDrainingControl)
        .run(sys);

Jackson JSON

Serializing data to JSON text with Jackson in a map operator will turn the object instance into a String which is used as value in the ProducerRecord.

Java
sourceimport com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.core.JsonParseException;
final ObjectWriter sampleDataWriter = mapper.writerFor(SampleData.class);

CompletionStage<Done> producerCompletion =
    Source.from(samples)
        .map(sampleDataWriter::writeValueAsString)
        .map(json -> new ProducerRecord<String, String>(topic, json))
        .runWith(Producer.plainSink(producerDefaults()), sys);

To de-serialize a JSON String with Jackson in a map operator, extract the String and apply the Jackson object reader in a map operator. Amend the map operator with the extracted type as the object reader is not generic.

This example uses resuming to react on data which can’t be parsed correctly and ignores faulty elements.

Java
sourceimport com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.core.JsonParseException;

final ObjectMapper mapper = new ObjectMapper();
final ObjectReader sampleDataReader = mapper.readerFor(SampleData.class);

final Attributes resumeOnParseException =
    ActorAttributes.withSupervisionStrategy(
        exception -> {
          if (exception instanceof JsonParseException) {
            return Supervision.resume();
          } else {
            return Supervision.stop();
          }
        });

Consumer.DrainingControl<List<SampleData>> control =
    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
        .map(ConsumerRecord::value)
        .<SampleData>map(sampleDataReader::readValue)
        .withAttributes(resumeOnParseException) // drop faulty elements
        .toMat(Sink.seq(), Consumer::createDrainingControl)
        .run(sys);

Spray JSON

To de-serialize a JSON String with Spray JSON in a map operator, extract the String and use the Spray-provided implicits parseJson and convertTo in a map operator.

This example uses resuming to react on data which can’t be parsed correctly and ignores faulty elements.

Scala
sourceimport spray.json._

final case class SampleData(name: String, value: Int)

object SampleDataSprayProtocol extends DefaultJsonProtocol {
  implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}

import SampleDataSprayProtocol._

    val resumeOnParsingException = ActorAttributes.supervisionStrategy {
      case _: spray.json.JsonParser.ParsingException => Supervision.Resume
      case _                                         => Supervision.stop
    }

    val consumer = Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      .map { consumerRecord =>
        val value = consumerRecord.value()
        val sampleData = value.parseJson.convertTo[SampleData]
        sampleData
      }
      .withAttributes(resumeOnParsingException)
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()

Avro with Schema Registry

If you want to use Confluent’s Schema Registry, you need to include the dependency on kafka-avro-serializer as shown below. It is not available from Maven Central, that’s why Confluent’s repository has to be specified. These examples use kafka-avro-seriazlizer version 7.7.2.

Maven
<project>
...
  <dependencies>
    ...
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>confluent.version (eg. 5.0.0)</version>
    </dependency>
    ...
  </dependencies>
  ...
  <repositories>
    <repository>
      <id>confluent-maven-repo</id>
      <name>Confluent Maven Repository</name>
      <url>https://packages.confluent.io/maven/</url>
    </repository>
  </repositories>
...
</project>
sbt
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, //  eg. 5.0.0
resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/",
Gradle
dependencies {
  compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 5.0.0
}
repositories {
  maven {
    url  "https://packages.confluent.io/maven/"
  }
}

Producer

To create serializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to the serializer and that serializer is used in the ProducerSettingsProducerSettings.

Scala
sourceimport io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer }
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization._
val producerSettings: ProducerSettings[String, SpecificRecord] = {
  val kafkaAvroSerializer = new KafkaAvroSerializer()
  kafkaAvroSerializer.configure(kafkaAvroSerDeConfig.asJava, false)
  val serializer = kafkaAvroSerializer.asInstanceOf[Serializer[SpecificRecord]]

  ProducerSettings(system, new StringSerializer, serializer)
    .withBootstrapServers(bootstrapServers)
}

val samples = (1 to 3).map(i => SampleAvroClass(s"key_$i", s"name_$i"))
val producerCompletion =
  Source(samples)
    .map(n => new ProducerRecord[String, SpecificRecord](topic, n.key, n))
    .runWith(Producer.plainSink(producerSettings))
Java
sourceimport io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(kafkaAvroSerDeConfig, false);
Serializer<Object> serializer = kafkaAvroSerializer;

ProducerSettings<String, Object> producerSettings =
    ProducerSettings.create(sys, new StringSerializer(), serializer)
        .withBootstrapServers(bootstrapServers());

SampleAvroClass sample = new SampleAvroClass("key", "name");
List<SampleAvroClass> samples = Arrays.asList(sample, sample, sample);
CompletionStage<Done> producerCompletion =
    Source.from(samples)
        .map(n -> new ProducerRecord<String, Object>(topic, n.key(), n))
        .runWith(Producer.plainSink(producerSettings), sys);

Consumer

To create deserializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to the deserializer and that deserializer is used in the ConsumerSettingsConsumerSettings.

Scala
sourceimport io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer }
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization._

val kafkaAvroSerDeConfig = Map[String, Any](
  AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl,
  KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString)
val consumerSettings: ConsumerSettings[String, SpecificRecord] = {
  val kafkaAvroDeserializer = new KafkaAvroDeserializer()
  kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
  val deserializer = kafkaAvroDeserializer.asInstanceOf[Deserializer[SpecificRecord]]

  ConsumerSettings(system, new StringDeserializer, deserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId(group)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}

val (control, result) =
  Consumer
    .plainSource(consumerSettings, Subscriptions.topics(topic))
    .take(samples.size.toLong)
    .map(_.value())
    .toMat(Sink.seq)(Keep.both)
    .run()
Java
sourceimport io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>();
kafkaAvroSerDeConfig.put(
    AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl());
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig, false);
Deserializer<Object> deserializer = kafkaAvroDeserializer;

ConsumerSettings<String, Object> consumerSettings =
    ConsumerSettings.create(sys, new StringDeserializer(), deserializer)
        .withBootstrapServers(bootstrapServers())
        .withGroupId(group)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Consumer.DrainingControl<List<ConsumerRecord<String, Object>>> controlCompletionStagePair =
    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
        .take(samples.size())
        .toMat(Sink.seq(), Consumer::createDrainingControl)
        .run(sys);