Serialization
The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a map
operation in the Apache Pekko Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Apache Pekko Stream, it is easier to implement the desired error handling strategy as the examples below show.
Protocol buffers
Protocol Buffers offer a language-neutral, platform-neutral, extensible mechanism for serializing structured data and allow consumers and producers to rely on the message format.
The easiest way to use Protocol Buffers with Apache Pekko Connectors Kafka is to serialize and deserialize the Kafka message payload as a byte array and call the Protocol Buffers serialization and deserialization in a regular map
operator. To serialize the Protobuf-defined type Order
into a byte array use the .toByteArray()
method which gets generated by the Protobuf compiler.
- Scala
-
source
// the Protobuf generated class import docs.scaladsl.proto.Order val producerSettings: ProducerSettings[String, Array[Byte]] = // ... val producerCompletion = Source(samples) .map(order => new ProducerRecord(topic, order.id, order.toByteArray)) .runWith(Producer.plainSink(producerSettings))
- Java
-
source
// the Protobuf generated class import docs.javadsl.proto.OrderMessages; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; ProducerSettings<String, byte[]> producerSettings = // ... CompletionStage<Done> producerCompletion = Source.from(samples) .map(order -> new ProducerRecord<>(topic, order.getId(), order.toByteArray())) .runWith(Producer.plainSink(producerSettings), sys);
To de-serialize a Protocol Buffers message in a map
operator, convert the received byte array to the designated type with the generated parseFrom()
method.
This example uses resuming to react on data which can’t be deserialized and ignores faulty elements.
- Scala
-
source
// the Protobuf generated class import docs.scaladsl.proto.Order val resumeOnParsingException = ActorAttributes.supervisionStrategy { case _: com.google.protobuf.InvalidProtocolBufferException => Supervision.Resume case _ => Supervision.stop } val consumerSettings: ConsumerSettings[String, Array[Byte]] = // ... val consumer = Consumer .plainSource(consumerSettings, Subscriptions.topics(topic)) .map { consumerRecord => Order.parseFrom(consumerRecord.value()) } .withAttributes(resumeOnParsingException) .toMat(Sink.seq)(DrainingControl.apply) .run()
- Java
-
source
// the Protobuf generated class import docs.javadsl.proto.OrderMessages; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; final Attributes resumeOnParseException = ActorAttributes.withSupervisionStrategy( exception -> { if (exception instanceof com.google.protobuf.InvalidProtocolBufferException) { return Supervision.resume(); } else { return Supervision.stop(); } }); ConsumerSettings<String, byte[]> consumerSettings = // ... Consumer.DrainingControl<List<OrderMessages.Order>> control = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic)) .map(ConsumerRecord::value) .map(OrderMessages.Order::parseFrom) .withAttributes(resumeOnParseException) // drop faulty elements .toMat(Sink.seq(), Consumer::createDrainingControl) .run(sys);
Jackson JSON
Serializing data to JSON text with Jackson in a map
operator will turn the object instance into a String which is used as value in the ProducerRecord
.
- Java
-
source
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.core.JsonParseException; final ObjectWriter sampleDataWriter = mapper.writerFor(SampleData.class); CompletionStage<Done> producerCompletion = Source.from(samples) .map(sampleDataWriter::writeValueAsString) .map(json -> new ProducerRecord<String, String>(topic, json)) .runWith(Producer.plainSink(producerDefaults()), sys);
To de-serialize a JSON String with Jackson in a map
operator, extract the String and apply the Jackson object reader in a map
operator. Amend the map
operator with the extracted type as the object reader is not generic.
This example uses resuming to react on data which can’t be parsed correctly and ignores faulty elements.
- Java
-
source
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.core.JsonParseException; final ObjectMapper mapper = new ObjectMapper(); final ObjectReader sampleDataReader = mapper.readerFor(SampleData.class); final Attributes resumeOnParseException = ActorAttributes.withSupervisionStrategy( exception -> { if (exception instanceof JsonParseException) { return Supervision.resume(); } else { return Supervision.stop(); } }); Consumer.DrainingControl<List<SampleData>> control = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic)) .map(ConsumerRecord::value) .<SampleData>map(sampleDataReader::readValue) .withAttributes(resumeOnParseException) // drop faulty elements .toMat(Sink.seq(), Consumer::createDrainingControl) .run(sys);
Spray JSON
To de-serialize a JSON String with Spray JSON in a map
operator, extract the String and use the Spray-provided implicits parseJson
and convertTo
in a map
operator.
This example uses resuming to react on data which can’t be parsed correctly and ignores faulty elements.
- Scala
-
source
import spray.json._ final case class SampleData(name: String, value: Int) object SampleDataSprayProtocol extends DefaultJsonProtocol { implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData) } import SampleDataSprayProtocol._ val resumeOnParsingException = ActorAttributes.supervisionStrategy { case _: spray.json.JsonParser.ParsingException => Supervision.Resume case _ => Supervision.stop } val consumer = Consumer .plainSource(consumerSettings, Subscriptions.topics(topic)) .map { consumerRecord => val value = consumerRecord.value() val sampleData = value.parseJson.convertTo[SampleData] sampleData } .withAttributes(resumeOnParsingException) .toMat(Sink.seq)(DrainingControl.apply) .run()
Avro with Schema Registry
If you want to use Confluent’s Schema Registry, you need to include the dependency on kafka-avro-serializer
as shown below. It is not available from Maven Central, that’s why Confluent’s repository has to be specified. These examples use kafka-avro-seriazlizer
version 7.7.2.
- Maven
-
<project> ... <dependencies> ... <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>confluent.version (eg. 5.0.0)</version> </dependency> ... </dependencies> ... <repositories> <repository> <id>confluent-maven-repo</id> <name>Confluent Maven Repository</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories> ... </project>
- sbt
-
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 5.0.0 resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/",
- Gradle
-
dependencies { compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 5.0.0 } repositories { maven { url "https://packages.confluent.io/maven/" } }
Producer
To create serializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
to the serializer and that serializer is used in the ProducerSettings
ProducerSettings
.
- Scala
-
source
import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer } import org.apache.avro.specific.SpecificRecord import org.apache.kafka.common.serialization._ val producerSettings: ProducerSettings[String, SpecificRecord] = { val kafkaAvroSerializer = new KafkaAvroSerializer() kafkaAvroSerializer.configure(kafkaAvroSerDeConfig.asJava, false) val serializer = kafkaAvroSerializer.asInstanceOf[Serializer[SpecificRecord]] ProducerSettings(system, new StringSerializer, serializer) .withBootstrapServers(bootstrapServers) } val samples = (1 to 3).map(i => SampleAvroClass(s"key_$i", s"name_$i")) val producerCompletion = Source(samples) .map(n => new ProducerRecord[String, SpecificRecord](topic, n.key, n)) .runWith(Producer.plainSink(producerSettings))
- Java
-
source
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(); kafkaAvroSerializer.configure(kafkaAvroSerDeConfig, false); Serializer<Object> serializer = kafkaAvroSerializer; ProducerSettings<String, Object> producerSettings = ProducerSettings.create(sys, new StringSerializer(), serializer) .withBootstrapServers(bootstrapServers()); SampleAvroClass sample = new SampleAvroClass("key", "name"); List<SampleAvroClass> samples = Arrays.asList(sample, sample, sample); CompletionStage<Done> producerCompletion = Source.from(samples) .map(n -> new ProducerRecord<String, Object>(topic, n.key(), n)) .runWith(Producer.plainSink(producerSettings), sys);
Consumer
To create deserializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
to the deserializer and that deserializer is used in the ConsumerSettings
ConsumerSettings
.
- Scala
-
source
import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer } import org.apache.avro.specific.SpecificRecord import org.apache.kafka.common.serialization._ val kafkaAvroSerDeConfig = Map[String, Any]( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString) val consumerSettings: ConsumerSettings[String, SpecificRecord] = { val kafkaAvroDeserializer = new KafkaAvroDeserializer() kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false) val deserializer = kafkaAvroDeserializer.asInstanceOf[Deserializer[SpecificRecord]] ConsumerSettings(system, new StringDeserializer, deserializer) .withBootstrapServers(bootstrapServers) .withGroupId(group) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") } val (control, result) = Consumer .plainSource(consumerSettings, Subscriptions.topics(topic)) .take(samples.size.toLong) .map(_.value()) .toMat(Sink.seq)(Keep.both) .run()
- Java
-
source
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>(); kafkaAvroSerDeConfig.put( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()); KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig, false); Deserializer<Object> deserializer = kafkaAvroDeserializer; ConsumerSettings<String, Object> consumerSettings = ConsumerSettings.create(sys, new StringDeserializer(), deserializer) .withBootstrapServers(bootstrapServers()) .withGroupId(group) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Consumer.DrainingControl<List<ConsumerRecord<String, Object>>> controlCompletionStagePair = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic)) .take(samples.size()) .toMat(Sink.seq(), Consumer::createDrainingControl) .run(sys);