AWS SNS
The AWS SNS connector provides an Apache Pekko Stream Flow and Sink for push notifications through AWS SNS.
For more information about AWS SNS please visit the official documentation.
Project Info: Apache Pekko Connectors AWS SNS | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-sns
1.2.0+3-e195cec2-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.17, 2.12.20, 3.3.6 |
JPMS module name | pekko.stream.connectors.aws.sns |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.1.5" val PekkoHttpVersion = "1.1.0" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-sns" % "1.2.0+3-e195cec2-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion, "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion )
- Maven
<properties> <pekko.version>1.1.5</pekko.version> <pekko.http.version>1.1.0</pekko.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-sns_${scala.binary.version}</artifactId> <version>1.2.0+3-e195cec2-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.5", PekkoHttpVersion: "1.1.0", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-sns_${versions.ScalaBinary}:1.2.0+3-e195cec2-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup
This connector requires an implicit SnsAsyncClient
instance to communicate with AWS SNS.
It is your code’s responsibility to call close
to free any resources held by the client. In this example it will be called when the actor system is terminated.
- Scala
-
source
import java.net.URI import pekko.stream.connectors.awsspi.PekkoHttpClient import software.amazon.awssdk.services.sns.SnsAsyncClient import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.regions.Region // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")) implicit val awsSnsClient: SnsAsyncClient = SnsAsyncClient .builder() .credentialsProvider(credentialsProvider) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build() system.registerOnTermination(awsSnsClient.close())
- Java
-
source
import java.net.URI; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sns.SnsAsyncClient; // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")); final SnsAsyncClient awsSnsClient = SnsAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build(); system.registerOnTermination(() -> awsSnsClient.close());
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.
We will also need an ActorSystem
ActorSystem
.
- Scala
-
source
implicit val system: ActorSystem = ActorSystem()
- Java
-
source
ActorSystem system = ActorSystem.create();
This is all preparation that we are going to need.
Publish messages to an SNS topic
Now we can publish a message to any SNS topic where we have access to by providing the topic ARN to the SnsPublisher
SnsPublisher
Flow or Sink factory method.
Using a Flow
- Scala
-
source
Source .single("message") .via(SnsPublisher.flow(topicArn)) .runWith(Sink.foreach(res => println(res.messageId()))) Source .single(PublishRequest.builder().message("message").build()) .via(SnsPublisher.publishFlow(topicArn)) .runWith(Sink.foreach(res => println(res.messageId()))) Source .single(PublishRequest.builder().message("message").topicArn(topicArn).build()) .via(SnsPublisher.publishFlow()) .runWith(Sink.foreach(res => println(res.messageId())))
- Java
-
source
Source.single("message") .via(SnsPublisher.createFlow(topicArn, snsClient)) .runWith(Sink.foreach(res -> System.out.println(res.messageId())), system); Source.single(PublishRequest.builder().message("message").build()) .via(SnsPublisher.createPublishFlow(topicArn, snsClient)) .runWith(Sink.foreach(res -> System.out.println(res.messageId())), system); Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build()) .via(SnsPublisher.createPublishFlow(snsClient)) .runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);
As you can see, this would publish the messages from the source to the specified AWS SNS topic. After a message has been successfully published, a PublishResult
will be pushed downstream.
Using a Sink
- Scala
-
source
Source .single("message") .runWith(SnsPublisher.sink(topicArn)) Source .single(PublishRequest.builder().message("message").build()) .runWith(SnsPublisher.publishSink(topicArn)) Source .single(PublishRequest.builder().message("message").topicArn(topicArn).build()) .runWith(SnsPublisher.publishSink())
- Java
-
source
Source.single("message").runWith(SnsPublisher.createSink(topicArn, snsClient), system); Source.single(PublishRequest.builder().message("message").build()) .runWith(SnsPublisher.createPublishSink(topicArn, snsClient), system); Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build()) .runWith(SnsPublisher.createPublishSink(snsClient), system);
As you can see, this would publish the messages from the source to the specified AWS SNS topic.