AWS DynamoDB

The AWS DynamoDB connector provides a flow for streaming DynamoDB requests. For more information about DynamoDB please visit the official documentation.

Project Info: Apache Pekko Connectors DynamoDB
Artifact
org.apache.pekko
pekko-connectors-dynamodb
1.1.0-M1+122-2a90cd8c-SNAPSHOT
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.aws.dynamodb
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.1.2"
val PekkoHttpVersion = "1.1.0"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-dynamodb" % "1.1.0-M1+122-2a90cd8c-SNAPSHOT",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
Maven
<properties>
  <pekko.version>1.1.2</pekko.version>
  <pekko.http.version>1.1.0</pekko.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-dynamodb_${scala.binary.version}</artifactId>
    <version>1.1.0-M1+122-2a90cd8c-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.1.2",
  PekkoHttpVersion: "1.1.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-dynamodb_${versions.ScalaBinary}:1.1.0-M1+122-2a90cd8c-SNAPSHOT"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Setup

This connector requires a DynamoDbAsyncClient instance to communicate with AWS DynamoDB.

It is your code’s responsibility to call close to free any resources held by the client. In this example it will be called when the actor system is terminated.

Scala
source
import pekko.stream.connectors.awsspi.PekkoHttpClient import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html private val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")) implicit val client: DynamoDbAsyncClient = DynamoDbAsyncClient .builder() .region(Region.AWS_GLOBAL) .credentialsProvider(credentialsProvider) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build() system.registerOnTermination(client.close())
Java
sourceimport org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;

import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import scala.util.Try;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;

final ActorSystem system = ActorSystem.create();

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
    StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
final DynamoDbAsyncClient client =
    DynamoDbAsyncClient.builder()
        .credentialsProvider(credentialsProvider)
        .region(Region.AWS_GLOBAL)
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(() -> client.close());

The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.

Sending requests and receiving responses

For simple operations you can issue a single request, and get back the result in a FutureCompletionStage.

Scala
sourceval listTablesResult: Future[ListTablesResponse] =
  DynamoDb.single(ListTablesRequest.builder().build())
Java
sourcefinal CompletionStage<ListTablesResponse> listTables =
    DynamoDb.single(
        client, DynamoDbOp.listTables(), ListTablesRequest.builder().build(), system);

You can also get the response to a request as an element emitted from a Flow:

Scala
sourceval source: Source[DescribeTableResponse, NotUsed] = Source
  .single(CreateTableRequest.builder().tableName("testTable").build())
  .via(DynamoDb.flow(parallelism = 1))
  .map(response => DescribeTableRequest.builder().tableName(response.tableDescription.tableName).build())
  .via(DynamoDb.flow(parallelism = 1))
Java
sourceSource<DescribeTableResponse, NotUsed> tableArnSource =
    Source.single(CreateTableRequest.builder().tableName("testTable").build())
        .via(DynamoDb.flow(client, DynamoDbOp.createTable(), 1))
        .map(
            result ->
                DescribeTableRequest.builder()
                    .tableName(result.tableDescription().tableName())
                    .build())
        .via(DynamoDb.flow(client, DynamoDbOp.describeTable(), 1));

Flow with context

The flowWithContext allows to send an arbitrary value, such as commit handles for JMS or Kafka, past the DynamoDb operation. The responses are wrapped in a Try to differentiate between successful operations and errors in-stream.

Scala
sourceval source: SourceWithContext[PutItemRequest, SomeContext, NotUsed] = // ???

val flow: FlowWithContext[PutItemRequest, SomeContext, Try[PutItemResponse], SomeContext, NotUsed] =
  DynamoDb.flowWithContext(parallelism = 1)

val writtenSource: SourceWithContext[PutItemResponse, SomeContext, NotUsed] = source
  .via(flow)
  .map {
    case Success(response)  => response
    case Failure(exception) => throw exception
  }
Java
sourceSourceWithContext<PutItemRequest, SomeContext, NotUsed> source = // ???

FlowWithContext<PutItemRequest, SomeContext, Try<PutItemResponse>, SomeContext, NotUsed> flow =
    DynamoDb.flowWithContext(client, DynamoDbOp.putItem(), 1);

SourceWithContext<PutItemResponse, SomeContext, NotUsed> writtenSource =
    source
        .via(flow)
        .map(
            result -> {
              if (result.isSuccess()) return result.get();
              else throw (Exception) result.failed().get();
            });

Pagination

The DynamoDB operations BatchGetItem, ListTables, Query and Scan allow paginating of results. The requests with paginated results can be used as source or in a flow with flowPaginated:

Scala
sourceval scanRequest = ScanRequest.builder().tableName("testTable").build()

val scanPages: Source[ScanResponse, NotUsed] =
  DynamoDb.source(scanRequest)

val scanPageInFlow: Source[ScanResponse, NotUsed] =
  Source
    .single(scanRequest)
    .via(DynamoDb.flowPaginated())
Java
sourceScanRequest scanRequest = ScanRequest.builder().tableName("testTable").build();

Source<ScanResponse, NotUsed> scanPages =
    DynamoDb.source(client, DynamoDbOp.scan(), scanRequest);

Source<ScanResponse, NotUsed> scanPageInFlow =
    Source.single(scanRequest).via(DynamoDb.flowPaginated(client, DynamoDbOp.scan()));

Error Retries and Exponential Backoff

The AWS SDK 2 implements error retrying with exponential backoff which is configurable via the DynamoDbAsyncClient configuration by using the RetryStrategy in overrideConfiguration.

See AWS Retry configuration for more details.

Scala
sourceimplicit val client: DynamoDbAsyncClient = DynamoDbAsyncClient
  .builder()
  .region(Region.AWS_GLOBAL)
  .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
  .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
  .overrideConfiguration(
    ClientOverrideConfiguration
      .builder()
      .retryStrategy(
        // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html
        DefaultRetryStrategy.legacyStrategyBuilder()
          .treatAsThrottling(_ => true)
          .build())
      .build())
  .build()
Java
sourceimport software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.retries.DefaultRetryStrategy;
final DynamoDbAsyncClient client =
    DynamoDbAsyncClient.builder()
        .region(Region.AWS_GLOBAL)
        .credentialsProvider(
            StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        .overrideConfiguration(
            ClientOverrideConfiguration.builder()
                .retryStrategy(
                    // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html
                    DefaultRetryStrategy.legacyStrategyBuilder()
                        .treatAsThrottling(e -> true)
                        .build())
                .build())
        .build();
system.registerOnTermination(client::close);