Shared AWS client configuration
Underlying HTTP client
- Scala
- 
  source import com.github.pjfanning.pekkohttpspi.PekkoHttpClient import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.CreateQueueRequest // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")) implicit val awsSqsClient = SqsAsyncClient .builder() .credentialsProvider(credentialsProvider) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build() system.registerOnTermination(awsSqsClient.close())
- Java
- 
  source import com.github.pjfanning.pekkohttpspi.PekkoHttpClient; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsAsyncClient; // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")); SqsAsyncClient sqsClient = SqsAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build(); system.registerOnTermination(() -> sqsClient.close());
The example snippets show how the AWS clients are setup to use Apache Pekko HTTP as the default HTTP client implementation via the thin adapter library AWS Apache Pekko-Http SPI implementation. By setting the httpClient explicitly (as above) the Apache Pekko actor system is reused. If it is not set explicitly then a separate actor system will be created internally.
Using Netty
It is possible to configure the use of Netty instead, which is Amazon’s default. Add an appropriate Netty version to the dependencies and configure NettyNioAsyncHttpClient.
- Scala
- 
  source import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient val customClient: SdkAsyncHttpClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build() implicit val customSqsClient: SqsAsyncClient = SqsAsyncClient .builder() .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .region(Region.EU_CENTRAL_1) .httpClient(customClient) .build()
- Java
- 
  source import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; SdkAsyncHttpClient customClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build(); final SqsAsyncClient customSqsClient = SqsAsyncClient.builder() .credentialsProvider( StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .endpointOverride(URI.create(sqsEndpoint)) .region(Region.US_WEST_2) .httpClient(customClient) .build(); system.registerOnTermination(() -> customSqsClient.close());
Please make sure to configure a big enough thread pool for the Netty client to avoid resource starvation. This is especially important, if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink) must be less than or equal to the thread pool size.
AWS retry configuration
The AWS SDK 2 supports request retrying with exponential backoff.
The request retry behaviour is configurable via the SdkDefaultClientBuilder.overrideConfiguration method by using the RetryPolicy.
- Scala
- 
  source import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting import software.amazon.awssdk.core.retry.RetryPolicy import software.amazon.awssdk.core.retry.backoff.BackoffStrategy import software.amazon.awssdk.core.retry.conditions.RetryCondition .overrideConfiguration( ClientOverrideConfiguration .builder() .retryPolicy( // This example shows the AWS SDK 2 `RetryPolicy.defaultRetryPolicy()` // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/retry/RetryPolicy.html RetryPolicy.builder .backoffStrategy(BackoffStrategy.defaultStrategy) .throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy) .numRetries(SdkDefaultRetrySetting.defaultMaxAttempts) .retryCondition(RetryCondition.defaultRetryCondition) .build) .build())
- Java
- 
  source import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.core.retry.backoff.BackoffStrategy; import software.amazon.awssdk.core.retry.conditions.RetryCondition; .overrideConfiguration( ClientOverrideConfiguration.builder() .retryPolicy( // This example shows the AWS SDK 2 `RetryPolicy.defaultRetryPolicy()` // See // https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/retry/RetryPolicy.html RetryPolicy.builder() .backoffStrategy(BackoffStrategy.defaultStrategy()) .throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy()) .numRetries(SdkDefaultRetrySetting.defaultMaxAttempts()) .retryCondition(RetryCondition.defaultRetryCondition()) .build()) .build())
AWS Access Keys
Do not encode AWS Access Keys in your source code or in static configuration. Please refer to Best Practices for Managing AWS Access Keys for proper AWS Access Key management.