AWS SQS
Amazon Simple Queue Service (Amazon SQS) offers a secure, durable, and available hosted queue that lets you integrate and decouple distributed software systems and components. Amazon SQS offers common constructs such as dead-letter queues and cost allocation tags. It provides a generic web services API and it can be accessed by any programming language that the AWS SDK supports.
For more information about AWS SQS please visit the official documentation.
The AWS SQS connector provides Apache Pekko Stream sources and sinks for AWS SQS queues.
| Project Info: Apache Pekko Connectors AWS SQS | |
|---|---|
| Artifact | org.apache.pekko pekko-connectors-sqs 1.0.2 | 
| JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 | 
| Scala versions | 2.13.14, 2.12.20, 3.3.3 | 
| JPMS module name | pekko.stream.connectors.aws.sqs | 
| License | |
| API documentation | |
| Forums | |
| Release notes | GitHub releases | 
| Issues | Github issues | 
| Sources | https://github.com/apache/pekko-connectors | 
Artifacts
- sbt
- val PekkoVersion = "1.0.3" val PekkoHttpVersion = "1.0.1" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-sqs" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion, "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion )
- Maven
- <properties> <pekko.version>1.0.3</pekko.version> <pekko.http.version>1.0.1</pekko.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-sqs_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> </dependencies>
- Gradle
- def versions = [ PekkoVersion: "1.0.3", PekkoHttpVersion: "1.0.1", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-sqs_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup
Prepare an ActorSystemActorSystem.
- Scala
- 
  source implicit val system: ActorSystem = ActorSystem()
- Java
- 
  source system = ActorSystem.create();
This connector requires an implicit SqsAsyncClient instance to communicate with AWS SQS.
It is your code’s responsibility to call close to free any resources held by the client. In this example it will be called when the actor system is terminated.
- Scala
- 
  source import com.github.pjfanning.pekkohttpspi.PekkoHttpClient import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.CreateQueueRequest // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")) implicit val awsSqsClient = SqsAsyncClient .builder() .credentialsProvider(credentialsProvider) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build() system.registerOnTermination(awsSqsClient.close())
- Java
- 
  source import com.github.pjfanning.pekkohttpspi.PekkoHttpClient; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsAsyncClient; // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")); SqsAsyncClient sqsClient = SqsAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build(); system.registerOnTermination(() -> sqsClient.close());
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.
Read from an SQS queue
The SqsSourceSqsSource created source reads AWS Java SDK SQS Message objects from any SQS queue given by the queue URL.
- Scala
- 
  source val messages: Future[immutable.Seq[Message]] = SqsSource( queueUrl, SqsSourceSettings().withCloseOnEmptyReceive(true).withWaitTime(10.millis)).runWith(Sink.seq)
- Java
- 
  source final CompletionStage<List<Message>> messages = SqsSource.create( queueUrl, SqsSourceSettings.create() .withCloseOnEmptyReceive(true) .withWaitTime(Duration.ofMillis(10)), sqsClient) .runWith(Sink.seq(), system);
In this example we use the closeOnEmptyReceive to let the stream complete when there are no more messages on the queue. In realistic scenarios, you should add a KillSwitch to the stream, see “Controlling stream completion with KillSwitch” in the Apache Pekko documentation.
Source configuration
- Scala
- 
  source val settings = SqsSourceSettings() .withWaitTime(20.seconds) .withMaxBufferSize(100) .withMaxBatchSize(10) .withAttributes(immutable.Seq(SenderId, SentTimestamp)) .withMessageAttribute(MessageAttributeName.create("bar.*")) .withCloseOnEmptyReceive(true) .withVisibilityTimeout(10.seconds)
- Java
- 
  source SqsSourceSettings settings = SqsSourceSettings.create() .withWaitTime(Duration.ofSeconds(20)) .withMaxBufferSize(100) .withMaxBatchSize(10) .withAttributes( Arrays.asList( MessageSystemAttributeName.senderId(), MessageSystemAttributeName.sentTimestamp())) .withMessageAttribute(MessageAttributeName.create("bar.*")) .withCloseOnEmptyReceive(true);
Options:
- maxBatchSize- the maximum number of messages to return per request (allowed values 1-10, see- MaxNumberOfMessagesin AWS docs). Default: 10
- maxBufferSize- internal buffer size used by the- Source. Default: 100 messages
- waitTimeSeconds- the duration for which the call waits for a message to arrive in the queue before returning (see- WaitTimeSecondsin AWS docs). Default: 20 seconds
- closeOnEmptyReceive- If true, the source completes when no messages are available.
More details are available in the AWS SQS Receive Message documentation.
An SqsSource can either provide an infinite stream of messages (the default), or can drain its source queue until no further messages are available. The latter behaviour is enabled by setting the closeOnEmptyReceive flag on creation. If set, the Source will receive messages until it encounters an empty reply from the server. It then continues to emit any remaining messages in its local buffer. The stage will complete once the last message has been sent downstream.
Note that for short-polling (waitTimeSeconds of 0), SQS may respond with an empty reply even if there are still messages in the queue. This behavior can be prevented by switching to long-polling (by setting waitTimeSeconds to a nonzero value).
Be aware that the SqsSource runs multiple requests to Amazon SQS in parallel. The maximum number of concurrent requests is limited by parallelism = maxBufferSize / maxBatchSize. E.g.: By default maxBatchSize is set to 10 and maxBufferSize is set to 100 so at the maximum, SqsSource will run 10 concurrent requests to Amazon SQS. 
Publish messages to an SQS queue
Create a String-accepting sink, publishing to an SQS queue.
- Scala
- 
  source Source .single("connectors") .runWith(SqsPublishSink(queueUrl))
- Java
- 
  source Source.single("connectors") .runWith( SqsPublishSink.create(queueUrl, SqsPublishSettings.create(), sqsClient), system);
Create a SendMessageRequest-accepting sink, that publishes an SQS queue.
- Scala
- 
  source // for fix SQS queue Source .single(SendMessageRequest.builder().messageBody("connectors").build()) .runWith(SqsPublishSink.messageSink(queueUrl)) // for dynamic SQS queues Source .single(SendMessageRequest.builder().messageBody("connectors").queueUrl(queueUrl).build()) .runWith(SqsPublishSink.messageSink())
- Java
- 
  source // for fix SQS queue Source.single(SendMessageRequest.builder().messageBody("connectors").build()) .runWith( SqsPublishSink.messageSink(queueUrl, SqsPublishSettings.create(), sqsClient), system); // for dynamic SQS queues Source.single( SendMessageRequest.builder().messageBody("connectors").queueUrl(queueUrl).build()) .runWith(SqsPublishSink.messageSink(SqsPublishSettings.create(), sqsClient), system);
You can also build flow stages which publish messages to SQS queues, backpressure on queue response, and then forward SqsPublishResultSqsPublishResult further down the stream.
- Scala
- 
  source // for fix SQS queue Source .single(SendMessageRequest.builder().messageBody("connectors").build()) .via(SqsPublishFlow(queueUrl)) .runWith(Sink.head) // for dynamic SQS queues Source .single(SendMessageRequest.builder().messageBody("connectors").queueUrl(queueUrl).build()) .via(SqsPublishFlow()) .runWith(Sink.head)
- Java
- 
  source // for fix SQS queue Source.single(SendMessageRequest.builder().messageBody("pekko-connectors-flow").build()) .via(SqsPublishFlow.create(queueUrl, SqsPublishSettings.create(), sqsClient)) .runWith(Sink.head(), system); // for dynamic SQS queues Source.single( SendMessageRequest.builder().messageBody("pekko-connectors-flow").queueUrl(queueUrl).build()) .via(SqsPublishFlow.create(SqsPublishSettings.create(), sqsClient)) .runWith(Sink.head(), system);
Group messages and publish batches to an SQS queue
Create a sink, that forwards String to the SQS queue. However, the main difference from the previous use case, it batches items and sends as one request and forwards a SqsPublishResultEntrySqsPublishResultEntry further down the stream for each item processed.
Note: There is also another option to send a batch of messages to SQS which is using AmazonSQSBufferedAsyncClient. This client buffers SendMessageRequests under the hood and sends them as a batch instead of sending them one by one. However, beware that AmazonSQSBufferedAsyncClient does not support FIFO Queues. See documentation for client-side buffering.
- Scala
- 
  source val messages = for (i <- 0 until 10) yield s"Message - $i" val future = Source(messages) .runWith(SqsPublishSink.grouped(queueUrl, SqsPublishGroupedSettings.Defaults.withMaxBatchSize(2)))
- Java
- 
  source List<String> messagesToSend = new ArrayList<>(); for (int i = 0; i < 20; i++) { messagesToSend.add("message - " + i); } CompletionStage<Done> done = Source.from(messagesToSend) .runWith( SqsPublishSink.grouped(queueUrl, SqsPublishGroupedSettings.create(), sqsClient), system);
Grouping configuration
- Scala
- 
  source val batchSettings = SqsPublishGroupedSettings() .withMaxBatchSize(10) .withMaxBatchWait(500.millis) .withConcurrentRequests(1)
- Java
- 
  source SqsPublishGroupedSettings batchSettings = SqsPublishGroupedSettings.create() .withMaxBatchSize(10) .withMaxBatchWait(Duration.ofMillis(500)) .withConcurrentRequests(1);
Options:
- maxBatchSize- the maximum number of messages in batch to send SQS. Default: 10.
- maxBatchWait- the maximum duration for which the stage waits until- maxBatchSizemessages arrived. Sends what is collects at the end of the time period even though the- maxBatchSizeis not fulfilled. Default: 500 milliseconds
- concurrentRequests- the number of batches sending to SQS concurrently.
Publish lists as batches to an SQS queue
Create a sink, that publishes Iterable[String]Iterable<String> to the SQS queue.
- Scala
- 
  source val messages = for (i <- 0 until 10) yield s"Message - $i" val future = Source .single(messages) .runWith(SqsPublishSink.batch(queueUrl))
- Java
- 
  source List<String> messagesToSend = new ArrayList<>(); for (int i = 0; i < 10; i++) { messagesToSend.add("Message - " + i); } CompletionStage<Done> done = Source.single(messagesToSend) .runWith( SqsPublishSink.batch(queueUrl, SqsPublishBatchSettings.create(), sqsClient), system);
Create a sink, that publishes Iterable[SendMessageRequest]Iterable<SendMessageRequest> to the SQS queue.
Be aware that the size of the batch must be less than or equal to 10 because Amazon SQS has a limit for batch requests. If the batch has more than 10 entries, the request will fail.
- Scala
- 
  source val messages = for (i <- 0 until 10) yield SendMessageRequest.builder().messageBody(s"Message - $i").build() val future = Source .single(messages) .runWith(SqsPublishSink.batchedMessageSink(queueUrl))
- Java
- 
  source List<SendMessageRequest> messagesToSend = new ArrayList<>(); for (int i = 0; i < 10; i++) { messagesToSend.add(SendMessageRequest.builder().messageBody("Message - " + i).build()); } CompletionStage<Done> done = Source.single(messagesToSend) .runWith( SqsPublishSink.batchedMessageSink( queueUrl, SqsPublishBatchSettings.create(), sqsClient), system);
Batch configuration
- Scala
- 
  source val batchSettings = SqsPublishBatchSettings() .withConcurrentRequests(1)
- Java
- 
  source SqsPublishBatchSettings batchSettings = SqsPublishBatchSettings.create().withConcurrentRequests(1);
Options:
- concurrentRequests- the number of batches sending to SQS concurrently.
Updating message statuses
SqsAckSink and SqsAckFlow provide the possibility to acknowledge (delete), ignore, or postpone messages on an SQS queue. They accept MessageActionMessageAction sub-classes to select the action to be taken.
For every message you may decide which action to take and push it together with message back to the queue:
- Delete- delete message from the queue
- Ignore- don’t change that message, and let it reappear in the queue after the visibility timeout
- ChangeMessageVisibility(visibilityTimeout)- can be used to postpone a message, or make the message immediately visible to other consumers. See official documentation for more details.
Acknowledge (delete) messages
- Scala
- 
  source SqsSource(queueUrl, sqsSourceSettings) .take(1) .map(MessageAction.Delete(_)) .runWith(SqsAckSink(queueUrl))
- Java
- 
  source source .map(m -> MessageAction.delete(m)) .runWith(SqsAckSink.create(queueUrl, SqsAckSettings.create(), awsClient), system);
Ignore messages
- Scala
- 
  source SqsSource(queueUrl, sqsSourceSettings) .take(1) .map(MessageAction.Ignore(_)) .runWith(SqsAckSink(queueUrl))
- Java
- 
  source source .map(m -> MessageAction.ignore(m)) .via(SqsAckFlow.create(queueUrl, SqsAckSettings.create(), awsClient)) .runWith(Sink.seq(), system);
Change Visibility Timeout of messages
- Scala
- 
  source SqsSource(queueUrl, sqsSourceSettings) .take(1) .map(MessageAction.ChangeMessageVisibility(_, 5.minutes)) .runWith(SqsAckSink(queueUrl))
- Java
- 
  source source .map(m -> MessageAction.changeMessageVisibility(m, 12)) .runWith(SqsAckSink.create(queueUrl, SqsAckSettings.create(), awsClient), system);
Update message status in a flow
The SqsAckFlow forwards a SqsAckResultSqsAckResult sub-class down the stream:
- DeleteResultto acknowledge message deletion
- ChangeMessageVisibilityResultto acknowledge message visibility change
- In case of Ignoreaction, nothing is performed on the sqs queue, thus noSqsAckResultis forwarded.
- Scala
- 
  source SqsSource(queueUrl, sqsSourceSettings) .take(1) .map(MessageAction.Delete(_)) .via(SqsAckFlow(queueUrl)) .runWith(Sink.head)
- Java
- 
  source source .map(m -> MessageAction.delete(m)) .via(SqsAckFlow.create(queueUrl, SqsAckSettings.create(), awsClient)) .runWith(Sink.seq(), system);
SqsAck configuration
- Scala
- 
  source val sinkSettings = SqsAckSettings() .withMaxInFlight(10)
- Java
- 
  source SqsAckSettings sinkSettings = SqsAckSettings.create().withMaxInFlight(10);
Options:
- maxInFlight- maximum number of messages being processed by- AmazonSQSAsyncat the same time. Default: 10
Updating message statuses in batches with grouping
SqsAckFlow.grouped batches actions on their type and forwards a SqsAckResultEntrySqsAckResultEntry sub-class for each item processed:
- DeleteResultEntryto acknowledge message deletion
- ChangeMessageVisibilityResultEntryto acknowledge message visibility change
- In case of Ignoreaction, nothing is performed on the sqs queue, thus noSqsAckResultis forwarded.
Acknowledge (delete) messages:
- Scala
- 
  source SqsSource(queueUrl, sqsSourceSettings) .take(10) .map(MessageAction.Delete(_)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.Defaults)) .runWith(Sink.seq)
- Java
- 
  source source .map(m -> MessageAction.delete(m)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient)) .runWith(Sink.seq(), system);
Ignore messages:
- Scala
- 
  source Source(messages) .take(10) .map(MessageAction.Ignore(_)) .via(SqsAckFlow.grouped("queue", SqsAckGroupedSettings.Defaults)) .runWith(Sink.seq)
- Java
- 
  source source .map(m -> MessageAction.ignore(m)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient)) .runWith(Sink.seq(), system);
Change Visibility Timeout of messages:
- Scala
- 
  source SqsSource(queueUrl, sqsSourceSettings) .take(10) .map(MessageAction.ChangeMessageVisibility(_, 5.minutes)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.Defaults)) .runWith(Sink.seq)
- Java
- 
  source source .map(m -> MessageAction.changeMessageVisibility(m, 5)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient)) .runWith(Sink.seq(), system);
Acknowledge grouping configuration
- Scala
- 
  source val batchSettings = SqsAckGroupedSettings() .withMaxBatchSize(10) .withMaxBatchWait(500.millis) .withConcurrentRequests(1)
- Java
- 
  source SqsAckGroupedSettings flowSettings = SqsAckGroupedSettings.create() .withMaxBatchSize(10) .withMaxBatchWait(Duration.ofMillis(500)) .withConcurrentRequests(1);
Options:
- maxBatchSize- the maximum number of messages in batch to send SQS. Default: 10.
- maxBatchWait- the maximum duration for which the stage waits until- maxBatchSizemessages arrived. Sends what is collects at the end of the time period even though the- maxBatchSizeis not fulfilled. Default: 500 milliseconds
- concurrentRequests- the number of batches sending to SQS concurrently.
Integration testing
For integration testing without touching Amazon SQS, Apache Pekko Connectors uses ElasticMQ, a queuing service which serves an AWS SQS compatible API.