AWS DynamoDB
The AWS DynamoDB connector provides a flow for streaming DynamoDB requests. For more information about DynamoDB please visit the official documentation.
Project Info: Apache Pekko Connectors DynamoDB | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-dynamodb
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.aws.dynamodb |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
val PekkoHttpVersion = "1.1.0"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-dynamodb" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<pekko.http.version>1.1.0</pekko.http.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-dynamodb_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http_${scala.binary.version}</artifactId>
<version>${pekko.http.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
PekkoHttpVersion: "1.1.0",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-dynamodb_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup¶
This connector requires a DynamoDbAsyncClient
instance to communicate with AWS DynamoDB.
It is your code’s responsibility to call close
to free any resources held by the client. In this example it will be called when the actor system is terminated.
source
import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
private val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val client: DynamoDbAsyncClient = DynamoDbAsyncClient
.builder()
.region(Region.AWS_GLOBAL)
.credentialsProvider(credentialsProvider)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build()
system.registerOnTermination(client.close())
sourceimport org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import scala.util.Try;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
final ActorSystem system = ActorSystem.create();
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
final DynamoDbAsyncClient client =
DynamoDbAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.region(Region.AWS_GLOBAL)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build();
system.registerOnTermination(() -> client.close());
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.
Sending requests and receiving responses¶
For simple operations you can issue a single request, and get back the result in a CompletionStage
.
sourceval listTablesResult: Future[ListTablesResponse] =
DynamoDb.single(ListTablesRequest.builder().build())
sourcefinal CompletionStage<ListTablesResponse> listTables =
DynamoDb.single(
client, DynamoDbOp.listTables(), ListTablesRequest.builder().build(), system);
You can also get the response to a request as an element emitted from a Flow:
sourceval source: Source[DescribeTableResponse, NotUsed] = Source
.single(CreateTableRequest.builder().tableName("testTable").build())
.via(DynamoDb.flow(parallelism = 1))
.map(response => DescribeTableRequest.builder().tableName(response.tableDescription.tableName).build())
.via(DynamoDb.flow(parallelism = 1))
sourceSource<DescribeTableResponse, NotUsed> tableArnSource =
Source.single(CreateTableRequest.builder().tableName("testTable").build())
.via(DynamoDb.flow(client, DynamoDbOp.createTable(), 1))
.map(
result ->
DescribeTableRequest.builder()
.tableName(result.tableDescription().tableName())
.build())
.via(DynamoDb.flow(client, DynamoDbOp.describeTable(), 1));
Flow with context¶
The flowWithContext
allows to send an arbitrary value, such as commit handles for JMS or Kafka, past the DynamoDb operation. The responses are wrapped in a Try
to differentiate between successful operations and errors in-stream.
sourceval source: SourceWithContext[PutItemRequest, SomeContext, NotUsed] = // ???
val flow: FlowWithContext[PutItemRequest, SomeContext, Try[PutItemResponse], SomeContext, NotUsed] =
DynamoDb.flowWithContext(parallelism = 1)
val writtenSource: SourceWithContext[PutItemResponse, SomeContext, NotUsed] = source
.via(flow)
.map {
case Success(response) => response
case Failure(exception) => throw exception
}
sourceSourceWithContext<PutItemRequest, SomeContext, NotUsed> source = // ???
FlowWithContext<PutItemRequest, SomeContext, Try<PutItemResponse>, SomeContext, NotUsed> flow =
DynamoDb.flowWithContext(client, DynamoDbOp.putItem(), 1);
SourceWithContext<PutItemResponse, SomeContext, NotUsed> writtenSource =
source
.via(flow)
.map(
result -> {
if (result.isSuccess()) return result.get();
else throw (Exception) result.failed().get();
});
Pagination¶
The DynamoDB operations BatchGetItem
, ListTables
, Query
and Scan
allow paginating of results. The requests with paginated results can be used as source or in a flow with flowPaginated
:
sourceval scanRequest = ScanRequest.builder().tableName("testTable").build()
val scanPages: Source[ScanResponse, NotUsed] =
DynamoDb.source(scanRequest)
val scanPageInFlow: Source[ScanResponse, NotUsed] =
Source
.single(scanRequest)
.via(DynamoDb.flowPaginated())
sourceScanRequest scanRequest = ScanRequest.builder().tableName("testTable").build();
Source<ScanResponse, NotUsed> scanPages =
DynamoDb.source(client, DynamoDbOp.scan(), scanRequest);
Source<ScanResponse, NotUsed> scanPageInFlow =
Source.single(scanRequest).via(DynamoDb.flowPaginated(client, DynamoDbOp.scan()));
Error Retries and Exponential Backoff¶
The AWS SDK 2 implements error retrying with exponential backoff which is configurable via the DynamoDbAsyncClient
configuration by using the RetryStrategy
in overrideConfiguration
.
See AWS Retry configuration for more details.
sourceimplicit val client: DynamoDbAsyncClient = DynamoDbAsyncClient
.builder()
.region(Region.AWS_GLOBAL)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
.overrideConfiguration(
ClientOverrideConfiguration
.builder()
.retryStrategy(
// See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html
DefaultRetryStrategy.legacyStrategyBuilder()
.treatAsThrottling(_ => true)
.build())
.build())
.build()
sourceimport software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.retries.DefaultRetryStrategy;
final DynamoDbAsyncClient client =
DynamoDbAsyncClient.builder()
.region(Region.AWS_GLOBAL)
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
.overrideConfiguration(
ClientOverrideConfiguration.builder()
.retryStrategy(
// See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html
DefaultRetryStrategy.legacyStrategyBuilder()
.treatAsThrottling(e -> true)
.build())
.build())
.build();
system.registerOnTermination(client::close);