Shared AWS client configuration
Underlying HTTP client¶
sourceimport software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val awsSqsClient = SqsAsyncClient
.builder()
.credentialsProvider(credentialsProvider)
.region(Region.EU_CENTRAL_1)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build()
system.registerOnTermination(awsSqsClient.close())
sourceimport software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
SqsAsyncClient sqsClient =
SqsAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.region(Region.EU_CENTRAL_1)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build();
system.registerOnTermination(() -> sqsClient.close());
The example snippets show how the AWS clients are setup to use Apache Pekko HTTP as the default HTTP client implementation via the thin adapter library AWS Apache Pekko-Http SPI implementation. By setting the httpClient
explicitly (as above) the Apache Pekko actor system is reused. If it is not set explicitly then a separate actor system will be created internally.
Using Netty¶
It is possible to configure the use of Netty instead, which is Amazon’s default. Add an appropriate Netty version to the dependencies and configure NettyNioAsyncHttpClient
.
sourceimport software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
val customClient: SdkAsyncHttpClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build()
implicit val customSqsClient: SqsAsyncClient = SqsAsyncClient
.builder()
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
.region(Region.EU_CENTRAL_1)
.httpClient(customClient)
.build()
sourceimport software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
SdkAsyncHttpClient customClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build();
final SqsAsyncClient customSqsClient =
SqsAsyncClient.builder()
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
.endpointOverride(URI.create(sqsEndpoint))
.region(Region.US_WEST_2)
.httpClient(customClient)
.build();
system.registerOnTermination(() -> customSqsClient.close());
Please make sure to configure a big enough thread pool for the Netty client to avoid resource starvation. This is especially important, if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism
(Source) and maxInFlight
(Sink) must be less than or equal to the thread pool size.
AWS retry configuration¶
The AWS SDK 2 supports request retrying with exponential backoff.
The request retry behaviour is configurable via the SdkDefaultClientBuilder.overrideConfiguration
method by using the RetryStrategy
.
sourceimport software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.retries.DefaultRetryStrategy
.overrideConfiguration(
ClientOverrideConfiguration
.builder()
.retryStrategy(
// See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html
DefaultRetryStrategy.legacyStrategyBuilder()
.treatAsThrottling(_ => true)
.build())
.build())
sourceimport software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.retries.DefaultRetryStrategy;
.overrideConfiguration(
ClientOverrideConfiguration.builder()
.retryStrategy(
// See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html
DefaultRetryStrategy.legacyStrategyBuilder()
.treatAsThrottling(e -> true)
.build())
.build())
AWS Access Keys¶
Do not encode AWS Access Keys in your source code or in static configuration. Please refer to Best Practices for Managing AWS Access Keys for proper AWS Access Key management.