AWS Kinesis and Firehose

The AWS Kinesis connector provides flows for streaming data to and from Kinesis Data streams and to Kinesis Firehose streams.

For more information about Kinesis please visit the Kinesis documentation.

Project Info: Apache Pekko Connectors Kinesis
Artifact
org.apache.pekko
pekko-connectors-kinesis
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.aws.kinesis
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
val PekkoHttpVersion = "1.0.1"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-kinesis" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <pekko.http.version>1.0.1</pekko.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-kinesis_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  PekkoHttpVersion: "1.0.1",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-kinesis_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Kinesis Data Streams

Create the Kinesis client

Sources and Flows provided by this connector need a KinesisAsyncClient instance to consume messages from a shard.

Note

The KinesisAsyncClient instance you supply is thread-safe and can be shared amongst multiple GraphStages. As a result, individual GraphStages will not automatically shutdown the supplied client when they complete. It is recommended to shut the client instance down on Actor system termination.

Scala
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient

implicit val system: ActorSystem = ActorSystem()

implicit val amazonKinesisAsync: software.amazon.awssdk.services.kinesis.KinesisAsyncClient =
  KinesisAsyncClient
    .builder()
    .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
    // Possibility to configure the retry policy
    // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
    // .overrideConfiguration(...)
    .build()

system.registerOnTermination(amazonKinesisAsync.close())
Java
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

final ActorSystem system = ActorSystem.create();

final software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesisAsync =
    KinesisAsyncClient.builder()
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(amazonKinesisAsync::close);

The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.

Kinesis as Source

The KinesisSource creates one GraphStage per shard. Reading from a shard requires an instance of ShardSettings.

Scala
source
val settings = ShardSettings(streamName = "myStreamName", shardId = "shard-id") .withRefreshInterval(1.second) .withLimit(500) .withShardIterator(ShardIterator.TrimHorizon)
Java
sourcefinal ShardSettings settings =
    ShardSettings.create("streamName", "shard-id")
        .withRefreshInterval(Duration.ofSeconds(1))
        .withLimit(500)
        .withShardIterator(ShardIterators.trimHorizon());

You have the choice of reading from a single shard, or reading from multiple shards. In the case of multiple shards the results of running a separate GraphStage for each shard will be merged together.

Warning

The GraphStage associated with a shard will remain open until the graph is stopped, or a GetRecords result returns an empty shard iterator indicating that the shard has been closed. This means that if you wish to continue processing records after a merge or reshard, you will need to recreate the source with the results of a new DescribeStream request, which can be done by simply creating a new KinesisSource. You can read more about adapting to a reshard in the AWS documentation.

For a single shard you simply provide the settings for a single shard.

Scala
sourceval source: Source[software.amazon.awssdk.services.kinesis.model.Record, NotUsed] =
  KinesisSource.basic(settings, amazonKinesisAsync)
Java
sourcefinal Source<software.amazon.awssdk.services.kinesis.model.Record, NotUsed> source =
    KinesisSource.basic(settings, amazonKinesisAsync);

You can merge multiple shards by providing a list settings.

Scala
sourceval mergeSettings = List(
  ShardSettings("myStreamName", "shard-id-1"),
  ShardSettings("myStreamName", "shard-id-2"))
val mergedSource: Source[Record, NotUsed] = KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync)
Java
sourcefinal List<ShardSettings> mergeSettings =
    Arrays.asList(
        ShardSettings.create("streamName", "shard-id-1"),
        ShardSettings.create("streamName", "shard-id-2"));
final Source<Record, NotUsed> two = KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync);

The constructed Source will return Record objects by calling GetRecords at the specified interval and according to the downstream demand.

Kinesis Put via Flow or as Sink

The KinesisFlow (or KinesisSink) KinesisFlow (or KinesisSink) publishes messages into a Kinesis stream using its partition key and message body. It uses dynamic size batches, can perform several requests in parallel and retries failed records. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.

Warning

Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.

More information can be found in the AWS documentation and the AWS API reference.

In order to correlate the results with the original message, an optional user context object of arbitrary type can be associated with every message and will be returned with the corresponding result. This allows keeping track of which messages have been successfully sent to Kinesis even if the message order gets mixed up.

Publishing to a Kinesis stream requires an instance of KinesisFlowSettings, although a default instance with sane values and a method that returns settings based on the stream shard number are also available:

Scala
sourceval flowSettings = KinesisFlowSettings
  .create()
  .withParallelism(1)
  .withMaxBatchSize(500)
  .withMaxRecordsPerSecond(1000)
  .withMaxBytesPerSecond(1000000)

val defaultFlowSettings = KinesisFlowSettings.Defaults

val fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4)
Java
sourcefinal KinesisFlowSettings flowSettings =
    KinesisFlowSettings.create()
        .withParallelism(1)
        .withMaxBatchSize(500)
        .withMaxRecordsPerSecond(1_000)
        .withMaxBytesPerSecond(1_000_000)
        .withMaxRecordsPerSecond(5);

final KinesisFlowSettings defaultFlowSettings = KinesisFlowSettings.create();

final KinesisFlowSettings fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4);
Warning

Note that throughput settings maxRecordsPerSecond and maxBytesPerSecond are vital to minimize server errors (like ProvisionedThroughputExceededException) and retries, and thus achieve a higher publication rate.

The Flow/Sink can now be created.

Scala
sourceval flow1: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow("myStreamName")

val flow2: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow("myStreamName", flowSettings)

val flow3: FlowWithContext[PutRecordsRequestEntry, String, PutRecordsResultEntry, String, NotUsed] =
  KinesisFlow.withContext("myStreamName")

val flow4: FlowWithContext[PutRecordsRequestEntry, String, PutRecordsResultEntry, String, NotUsed] =
  KinesisFlow.withContext("myStreamName", flowSettings)

val flow5: Flow[(String, ByteString), PutRecordsResultEntry, NotUsed] =
  KinesisFlow.byPartitionAndBytes("myStreamName")

val flow6: Flow[(String, ByteBuffer), PutRecordsResultEntry, NotUsed] =
  KinesisFlow.byPartitionAndData("myStreamName")

val sink1: Sink[PutRecordsRequestEntry, NotUsed] = KinesisSink("myStreamName")
val sink2: Sink[PutRecordsRequestEntry, NotUsed] = KinesisSink("myStreamName", flowSettings)
val sink3: Sink[(String, ByteString), NotUsed] = KinesisSink.byPartitionAndBytes("myStreamName")
val sink4: Sink[(String, ByteBuffer), NotUsed] = KinesisSink.byPartitionAndData("myStreamName")
Java
sourcefinal Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> flow =
    KinesisFlow.create("streamName", flowSettings, amazonKinesisAsync);

final Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> defaultSettingsFlow =
    KinesisFlow.create("streamName", amazonKinesisAsync);

final FlowWithContext<PutRecordsRequestEntry, String, PutRecordsResultEntry, String, NotUsed>
    flowWithStringContext =
        KinesisFlow.createWithContext("streamName", flowSettings, amazonKinesisAsync);

final FlowWithContext<PutRecordsRequestEntry, String, PutRecordsResultEntry, String, NotUsed>
    defaultSettingsFlowWithStringContext =
        KinesisFlow.createWithContext("streamName", flowSettings, amazonKinesisAsync);

final Sink<PutRecordsRequestEntry, NotUsed> sink =
    KinesisSink.create("streamName", flowSettings, amazonKinesisAsync);

final Sink<PutRecordsRequestEntry, NotUsed> defaultSettingsSink =
    KinesisSink.create("streamName", amazonKinesisAsync);
Warning

As of version 2, the library will not retry failed requests: this is handled by the underlying KinesisAsyncClient (see client configuration). This means that you may have to inspect individual responses to make sure they have been successful:

Scala
sourceval flowWithErrors: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow("myStreamName")
  .map { response =>
    if (response.errorCode() ne null) {
      throw new RuntimeException(response.errorCode())
    }

    response
  }
Java
sourcefinal Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> flowWithErrors =
    KinesisFlow.create("streamName", flowSettings, amazonKinesisAsync)
        .map(
            response -> {
              if (response.errorCode() != null) {
                throw new RuntimeException(response.errorCode());
              }

              return response;
            });
Note

The default behavior of the KinesisFlow and KinesisSink is to batch according to the KinesisFlowSettings provided and to throw any error the Kinesis client throws. If it is necessary to have special handling for batching or of errors and successful results the methods KinesisFlow.batchingFlow & KinesisFlow.batchWritingFlow can be used and combined in other ways than the default.

AWS KCL Scheduler Source & checkpointer

The KCL Source can read from several shards and rebalance automatically when other Schedulers are started or stopped. It also handles record sequence checkpoints.

For more information about KCL please visit the official documentation.

Usage

The KCL Scheduler Source needs to create and manage Scheduler instances in order to consume records from Kinesis Streams.

In order to use it, you need to provide a Scheduler builder and the Source settings:

Scala
sourceval schedulerSourceSettings = KinesisSchedulerSourceSettings(bufferSize = 1000, backpressureTimeout = 1.minute)

val builder: ShardRecordProcessorFactory => Scheduler =
  recordProcessorFactory => {

    val streamName = "myStreamName"

    val configsBuilder = new ConfigsBuilder(
      streamName,
      "myApp",
      kinesisClient,
      dynamoClient,
      cloudWatchClient,
      s"${
          import scala.sys.process._
          "hostname".!!.trim()
        }:${UUID.randomUUID()}",
      recordProcessorFactory)

    new Scheduler(
      configsBuilder.checkpointConfig,
      configsBuilder.coordinatorConfig,
      configsBuilder.leaseManagementConfig,
      configsBuilder.lifecycleConfig,
      configsBuilder.metricsConfig,
      configsBuilder.processorConfig,
      configsBuilder.retrievalConfig)
  }
Java
sourcefinal KinesisSchedulerSource.SchedulerBuilder schedulerBuilder =
    new KinesisSchedulerSource.SchedulerBuilder() {
      @Override
      public Scheduler build(ShardRecordProcessorFactory r) {
        return null; // build your own Scheduler here
      }
    };
final KinesisSchedulerSourceSettings schedulerSettings =
    KinesisSchedulerSourceSettings.create(1000, Duration.of(1L, ChronoUnit.SECONDS));

Then the Source can be created as usual:

Scala
sourceval source = KinesisSchedulerSource(builder, schedulerSourceSettings)
  .log("kinesis-records", "Consumed record " + _.sequenceNumber)
Java
sourcefinal Source<CommittableRecord, CompletionStage<Scheduler>> schedulerSource =
    KinesisSchedulerSource.create(schedulerBuilder, schedulerSettings);

Committing records

The KCL Scheduler Source publishes messages downstream that can be committed in order to mark progression of consumers by shard. This process can be done manually or using the provided checkpointer Flow/Sink.

In order to use the Flow/Sink you must provide additional checkpoint settings:

Scala
sourceval checkpointSettings = KinesisSchedulerCheckpointSettings(100, 30.seconds)

source
  .via(KinesisSchedulerSource.checkpointRecordsFlow(checkpointSettings))
  .to(Sink.ignore)
source
  .to(KinesisSchedulerSource.checkpointRecordsSink(checkpointSettings))
Java
sourcefinal KinesisSchedulerCheckpointSettings checkpointSettings =
    KinesisSchedulerCheckpointSettings.create(1000, Duration.of(30L, ChronoUnit.SECONDS));
final Flow<CommittableRecord, KinesisClientRecord, NotUsed> checkpointFlow =
    KinesisSchedulerSource.checkpointRecordsFlow(checkpointSettings);

Note that checkpointer Flow may not maintain the input order of records of different shards.

Kinesis Firehose Streams

Create the Kinesis Firehose client

Flows provided by this connector need a FirehoseAsyncClient instance to publish messages.

Note

The FirehoseAsyncClient instance you supply is thread-safe and can be shared amongst multiple GraphStages. As a result, individual GraphStages will not automatically shutdown the supplied client when they complete. It is recommended to shut the client instance down on Actor system termination.

Scala
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient

implicit val system: ActorSystem = ActorSystem()

implicit val amazonKinesisFirehoseAsync: software.amazon.awssdk.services.firehose.FirehoseAsyncClient =
  FirehoseAsyncClient
    .builder()
    .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
    // Possibility to configure the retry policy
    // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
    // .overrideConfiguration(...)
    .build()

system.registerOnTermination(amazonKinesisFirehoseAsync.close())
Java
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;

final ActorSystem system = ActorSystem.create();

final software.amazon.awssdk.services.firehose.FirehoseAsyncClient amazonFirehoseAsync =
    FirehoseAsyncClient.builder()
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(amazonFirehoseAsync::close);

The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.

Kinesis Firehose Put via Flow or as Sink

The KinesisFirehoseFlow (or KinesisFirehoseSink) KinesisFirehoseFlow (or KinesisFirehoseSink) publishes messages into a Kinesis Firehose stream using its message body. It uses dynamic size batches and can perform several requests in parallel. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.

Warning

Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.

More information can be found in the AWS API reference.

Publishing to a Kinesis Firehose stream requires an instance of KinesisFirehoseFlowSettings, although a default instance with sane values is available:

Scala
sourceval flowSettings = KinesisFirehoseFlowSettings
  .create()
  .withParallelism(1)
  .withMaxBatchSize(500)
  .withMaxRecordsPerSecond(5000)
  .withMaxBytesPerSecond(4000000)

val defaultFlowSettings = KinesisFirehoseFlowSettings.Defaults
Java
sourcefinal KinesisFirehoseFlowSettings flowSettings =
    KinesisFirehoseFlowSettings.create()
        .withParallelism(1)
        .withMaxBatchSize(500)
        .withMaxRecordsPerSecond(1_000)
        .withMaxBytesPerSecond(1_000_000)
        .withMaxRecordsPerSecond(5);

final KinesisFirehoseFlowSettings defaultFlowSettings = KinesisFirehoseFlowSettings.create();
Warning

Note that throughput settings maxRecordsPerSecond and maxBytesPerSecond are vital to minimize server errors (like ProvisionedThroughputExceededException) and retries, and thus achieve a higher publication rate.

The Flow/Sink can now be created.

Scala
sourceval flow1: Flow[Record, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow("myStreamName")

val flow2: Flow[Record, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow("myStreamName", flowSettings)

val sink1: Sink[Record, NotUsed] = KinesisFirehoseSink("myStreamName")
val sink2: Sink[Record, NotUsed] = KinesisFirehoseSink("myStreamName", flowSettings)
Java
sourcefinal Flow<Record, PutRecordBatchResponseEntry, NotUsed> flow =
    KinesisFirehoseFlow.apply("streamName", flowSettings, amazonFirehoseAsync);

final Flow<Record, PutRecordBatchResponseEntry, NotUsed> defaultSettingsFlow =
    KinesisFirehoseFlow.apply("streamName", amazonFirehoseAsync);

final Sink<Record, NotUsed> sink =
    KinesisFirehoseSink.apply("streamName", flowSettings, amazonFirehoseAsync);

final Sink<Record, NotUsed> defaultSettingsSink =
    KinesisFirehoseSink.apply("streamName", amazonFirehoseAsync);
Warning

As of version 2, the library will not retry failed requests. See AWS Retry Configuration how to configure it for the FirehoseAsyncClient.

This means that you may have to inspect individual responses to make sure they have been successful:

Scala
sourceval flowWithErrors: Flow[Record, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow("streamName")
  .map { response =>
    if (response.errorCode() != null) {
      throw new RuntimeException(response.errorCode())
    }
    response
  }
Java
sourcefinal Flow<Record, PutRecordBatchResponseEntry, NotUsed> flowWithErrors =
    KinesisFirehoseFlow.apply("streamName", flowSettings, amazonFirehoseAsync)
        .map(
            response -> {
              if (response.errorCode() != null) {
                throw new RuntimeException(response.errorCode());
              }
              return response;
            });