AWS DynamoDB
The AWS DynamoDB connector provides a flow for streaming DynamoDB requests. For more information about DynamoDB please visit the official documentation.
Project Info: Apache Pekko Connectors DynamoDB | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-dynamodb
1.1.0-M1+122-2a90cd8c-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.aws.dynamodb |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.1.2" val PekkoHttpVersion = "1.1.0" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-dynamodb" % "1.1.0-M1+122-2a90cd8c-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion, "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion )
- Maven
<properties> <pekko.version>1.1.2</pekko.version> <pekko.http.version>1.1.0</pekko.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-dynamodb_${scala.binary.version}</artifactId> <version>1.1.0-M1+122-2a90cd8c-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.2", PekkoHttpVersion: "1.1.0", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-dynamodb_${versions.ScalaBinary}:1.1.0-M1+122-2a90cd8c-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup
This connector requires a DynamoDbAsyncClient
instance to communicate with AWS DynamoDB.
It is your code’s responsibility to call close
to free any resources held by the client. In this example it will be called when the actor system is terminated.
- Scala
-
source
import pekko.stream.connectors.awsspi.PekkoHttpClient import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html private val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")) implicit val client: DynamoDbAsyncClient = DynamoDbAsyncClient .builder() .region(Region.AWS_GLOBAL) .credentialsProvider(credentialsProvider) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build() system.registerOnTermination(client.close()) - Java
-
source
import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient; import scala.util.Try; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; final ActorSystem system = ActorSystem.create(); // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")); final DynamoDbAsyncClient client = DynamoDbAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(Region.AWS_GLOBAL) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build(); system.registerOnTermination(() -> client.close());
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.
Sending requests and receiving responses
For simple operations you can issue a single request, and get back the result in a Future
CompletionStage
.
- Scala
-
source
val listTablesResult: Future[ListTablesResponse] = DynamoDb.single(ListTablesRequest.builder().build())
- Java
-
source
final CompletionStage<ListTablesResponse> listTables = DynamoDb.single( client, DynamoDbOp.listTables(), ListTablesRequest.builder().build(), system);
You can also get the response to a request as an element emitted from a Flow:
- Scala
-
source
val source: Source[DescribeTableResponse, NotUsed] = Source .single(CreateTableRequest.builder().tableName("testTable").build()) .via(DynamoDb.flow(parallelism = 1)) .map(response => DescribeTableRequest.builder().tableName(response.tableDescription.tableName).build()) .via(DynamoDb.flow(parallelism = 1))
- Java
-
source
Source<DescribeTableResponse, NotUsed> tableArnSource = Source.single(CreateTableRequest.builder().tableName("testTable").build()) .via(DynamoDb.flow(client, DynamoDbOp.createTable(), 1)) .map( result -> DescribeTableRequest.builder() .tableName(result.tableDescription().tableName()) .build()) .via(DynamoDb.flow(client, DynamoDbOp.describeTable(), 1));
Flow with context
The flowWithContext
allows to send an arbitrary value, such as commit handles for JMS or Kafka, past the DynamoDb operation. The responses are wrapped in a Try
to differentiate between successful operations and errors in-stream.
- Scala
-
source
val source: SourceWithContext[PutItemRequest, SomeContext, NotUsed] = // ??? val flow: FlowWithContext[PutItemRequest, SomeContext, Try[PutItemResponse], SomeContext, NotUsed] = DynamoDb.flowWithContext(parallelism = 1) val writtenSource: SourceWithContext[PutItemResponse, SomeContext, NotUsed] = source .via(flow) .map { case Success(response) => response case Failure(exception) => throw exception }
- Java
-
source
SourceWithContext<PutItemRequest, SomeContext, NotUsed> source = // ??? FlowWithContext<PutItemRequest, SomeContext, Try<PutItemResponse>, SomeContext, NotUsed> flow = DynamoDb.flowWithContext(client, DynamoDbOp.putItem(), 1); SourceWithContext<PutItemResponse, SomeContext, NotUsed> writtenSource = source .via(flow) .map( result -> { if (result.isSuccess()) return result.get(); else throw (Exception) result.failed().get(); });
Pagination
The DynamoDB operations BatchGetItem
, ListTables
, Query
and Scan
allow paginating of results. The requests with paginated results can be used as source or in a flow with flowPaginated
:
- Scala
-
source
val scanRequest = ScanRequest.builder().tableName("testTable").build() val scanPages: Source[ScanResponse, NotUsed] = DynamoDb.source(scanRequest) val scanPageInFlow: Source[ScanResponse, NotUsed] = Source .single(scanRequest) .via(DynamoDb.flowPaginated())
- Java
-
source
ScanRequest scanRequest = ScanRequest.builder().tableName("testTable").build(); Source<ScanResponse, NotUsed> scanPages = DynamoDb.source(client, DynamoDbOp.scan(), scanRequest); Source<ScanResponse, NotUsed> scanPageInFlow = Source.single(scanRequest).via(DynamoDb.flowPaginated(client, DynamoDbOp.scan()));
Error Retries and Exponential Backoff
The AWS SDK 2 implements error retrying with exponential backoff which is configurable via the DynamoDbAsyncClient
configuration by using the RetryStrategy
in overrideConfiguration
.
See AWS Retry configuration for more details.
- Scala
-
source
implicit val client: DynamoDbAsyncClient = DynamoDbAsyncClient .builder() .region(Region.AWS_GLOBAL) .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) .overrideConfiguration( ClientOverrideConfiguration .builder() .retryStrategy( // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html DefaultRetryStrategy.legacyStrategyBuilder() .treatAsThrottling(_ => true) .build()) .build()) .build()
- Java
-
source
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.retries.DefaultRetryStrategy; final DynamoDbAsyncClient client = DynamoDbAsyncClient.builder() .region(Region.AWS_GLOBAL) .credentialsProvider( StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) .overrideConfiguration( ClientOverrideConfiguration.builder() .retryStrategy( // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html DefaultRetryStrategy.legacyStrategyBuilder() .treatAsThrottling(e -> true) .build()) .build()) .build(); system.registerOnTermination(client::close);