Shared AWS client configuration

Underlying HTTP client

Scala
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val awsSqsClient = SqsAsyncClient
  .builder()
  .credentialsProvider(credentialsProvider)
  .region(Region.EU_CENTRAL_1)
  .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
  // Possibility to configure the retry policy
  // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
  // .overrideConfiguration(...)
  .build()

system.registerOnTermination(awsSqsClient.close())
Java
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
    StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
SqsAsyncClient sqsClient =
    SqsAsyncClient.builder()
        .credentialsProvider(credentialsProvider)
        .region(Region.EU_CENTRAL_1)
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(() -> sqsClient.close());

The example snippets show how the AWS clients are setup to use Apache Pekko HTTP as the default HTTP client implementation via the thin adapter library AWS Apache Pekko-Http SPI implementation. By setting the httpClient explicitly (as above) the Apache Pekko actor system is reused. If it is not set explicitly then a separate actor system will be created internally.

Using Netty

It is possible to configure the use of Netty instead, which is Amazon’s default. Add an appropriate Netty version to the dependencies and configure NettyNioAsyncHttpClient.

Scala
sourceimport software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
val customClient: SdkAsyncHttpClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build()
  implicit val customSqsClient: SqsAsyncClient = SqsAsyncClient
    .builder()
    .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
    .region(Region.EU_CENTRAL_1)
    .httpClient(customClient)
    .build()
Java
sourceimport software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
SdkAsyncHttpClient customClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build();
final SqsAsyncClient customSqsClient =
    SqsAsyncClient.builder()
        .credentialsProvider(
            StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
        .endpointOverride(URI.create(sqsEndpoint))
        .region(Region.US_WEST_2)
        .httpClient(customClient)
        .build();

system.registerOnTermination(() -> customSqsClient.close());

Please make sure to configure a big enough thread pool for the Netty client to avoid resource starvation. This is especially important, if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink) must be less than or equal to the thread pool size.

AWS retry configuration

The AWS SDK 2 supports request retrying with exponential backoff.

The request retry behaviour is configurable via the SdkDefaultClientBuilder.overrideConfiguration method by using the RetryPolicy.

Scala
sourceimport software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy
import software.amazon.awssdk.core.retry.conditions.RetryCondition

.overrideConfiguration(
  ClientOverrideConfiguration
    .builder()
    .retryPolicy(
      // This example shows the AWS SDK 2 `RetryPolicy.defaultRetryPolicy()`
      // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/retry/RetryPolicy.html
      RetryPolicy.builder
        .backoffStrategy(BackoffStrategy.defaultStrategy)
        .throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy)
        .numRetries(SdkDefaultRetrySetting.defaultMaxAttempts)
        .retryCondition(RetryCondition.defaultRetryCondition)
        .build)
    .build())
Java
sourceimport software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
import software.amazon.awssdk.core.retry.conditions.RetryCondition;

.overrideConfiguration(
    ClientOverrideConfiguration.builder()
        .retryPolicy(
            // This example shows the AWS SDK 2 `RetryPolicy.defaultRetryPolicy()`
            // See
            // https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/retry/RetryPolicy.html
            RetryPolicy.builder()
                .backoffStrategy(BackoffStrategy.defaultStrategy())
                .throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy())
                .numRetries(SdkDefaultRetrySetting.defaultMaxAttempts())
                .retryCondition(RetryCondition.defaultRetryCondition())
                .build())
        .build())

AWS Access Keys

Do not encode AWS Access Keys in your source code or in static configuration. Please refer to Best Practices for Managing AWS Access Keys for proper AWS Access Key management.