AWS SNS
The AWS SNS connector provides an Apache Pekko Stream Flow and Sink for push notifications through AWS SNS.
For more information about AWS SNS please visit the official documentation.
Project Info: Apache Pekko Connectors AWS SNS | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-sns
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.aws.sns |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
val PekkoHttpVersion = "1.1.0"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-sns" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<pekko.http.version>1.1.0</pekko.http.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-sns_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http_${scala.binary.version}</artifactId>
<version>${pekko.http.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
PekkoHttpVersion: "1.1.0",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-sns_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup¶
This connector requires an SnsAsyncClient
instance to communicate with AWS SNS.
It is your code’s responsibility to call close
to free any resources held by the client. In this example it will be called when the actor system is terminated.
sourceimport java.net.URI
import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.services.sns.SnsAsyncClient
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.regions.Region
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val awsSnsClient: SnsAsyncClient =
SnsAsyncClient
.builder()
.credentialsProvider(credentialsProvider)
.region(Region.EU_CENTRAL_1)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build()
system.registerOnTermination(awsSnsClient.close())
sourceimport java.net.URI;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
final SnsAsyncClient awsSnsClient =
SnsAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.region(Region.EU_CENTRAL_1)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build();
system.registerOnTermination(() -> awsSnsClient.close());
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.
We will also need an ActorSystem
.
sourceimplicit val system: ActorSystem = ActorSystem()
sourceActorSystem system = ActorSystem.create();
This is all preparation that we are going to need.
Publish messages to an SNS topic¶
Now we can publish a message to any SNS topic where we have access to by providing the topic ARN to the SnsPublisher
Flow or Sink factory method.
Using a Flow¶
sourceSource
.single("message")
.via(SnsPublisher.flow(topicArn))
.runWith(Sink.foreach(res => println(res.messageId())))
Source
.single(PublishRequest.builder().message("message").build())
.via(SnsPublisher.publishFlow(topicArn))
.runWith(Sink.foreach(res => println(res.messageId())))
Source
.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
.via(SnsPublisher.publishFlow())
.runWith(Sink.foreach(res => println(res.messageId())))
sourceSource.single("message")
.via(SnsPublisher.createFlow(topicArn, snsClient))
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);
Source.single(PublishRequest.builder().message("message").build())
.via(SnsPublisher.createPublishFlow(topicArn, snsClient))
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);
Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
.via(SnsPublisher.createPublishFlow(snsClient))
.runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);
As you can see, this would publish the messages from the source to the specified AWS SNS topic. After a message has been successfully published, a PublishResult
will be pushed downstream.
Using a Sink¶
sourceSource
.single("message")
.runWith(SnsPublisher.sink(topicArn))
Source
.single(PublishRequest.builder().message("message").build())
.runWith(SnsPublisher.publishSink(topicArn))
Source
.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
.runWith(SnsPublisher.publishSink())
sourceSource.single("message").runWith(SnsPublisher.createSink(topicArn, snsClient), system);
Source.single(PublishRequest.builder().message("message").build())
.runWith(SnsPublisher.createPublishSink(topicArn, snsClient), system);
Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
.runWith(SnsPublisher.createPublishSink(snsClient), system);
As you can see, this would publish the messages from the source to the specified AWS SNS topic.