AWS SNS

The AWS SNS connector provides an Apache Pekko Stream Flow and Sink for push notifications through AWS SNS.

For more information about AWS SNS please visit the official documentation.

Project Info: Apache Pekko Connectors AWS SNS
Artifact
org.apache.pekko
pekko-connectors-sns
1.1.0-M1+154-6981eaa8-SNAPSHOT
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.aws.sns
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.1.2"
val PekkoHttpVersion = "1.1.0"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-sns" % "1.1.0-M1+154-6981eaa8-SNAPSHOT",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
Maven
<properties>
  <pekko.version>1.1.2</pekko.version>
  <pekko.http.version>1.1.0</pekko.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-sns_${scala.binary.version}</artifactId>
    <version>1.1.0-M1+154-6981eaa8-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.1.2",
  PekkoHttpVersion: "1.1.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-sns_${versions.ScalaBinary}:1.1.0-M1+154-6981eaa8-SNAPSHOT"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Setup

This connector requires an implicit SnsAsyncClient instance to communicate with AWS SNS.

It is your code’s responsibility to call close to free any resources held by the client. In this example it will be called when the actor system is terminated.

Scala
sourceimport java.net.URI

import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.services.sns.SnsAsyncClient
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.regions.Region

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val awsSnsClient: SnsAsyncClient =
  SnsAsyncClient
    .builder()
    .credentialsProvider(credentialsProvider)
    .region(Region.EU_CENTRAL_1)
    .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
    // Possibility to configure the retry policy
    // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
    // .overrideConfiguration(...)
    .build()

system.registerOnTermination(awsSnsClient.close())
Java
sourceimport java.net.URI;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsAsyncClient;

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
    StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
final SnsAsyncClient awsSnsClient =
    SnsAsyncClient.builder()
        .credentialsProvider(credentialsProvider)
        .region(Region.EU_CENTRAL_1)
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(() -> awsSnsClient.close());

The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.

We will also need an ActorSystemActorSystem.

Scala
sourceimplicit val system: ActorSystem = ActorSystem()
Java
sourceActorSystem system = ActorSystem.create();

This is all preparation that we are going to need.

Publish messages to an SNS topic

Now we can publish a message to any SNS topic where we have access to by providing the topic ARN to the SnsPublisherSnsPublisher Flow or Sink factory method.

Using a Flow

Scala
sourceSource
  .single("message")
  .via(SnsPublisher.flow(topicArn))
  .runWith(Sink.foreach(res => println(res.messageId())))

Source
  .single(PublishRequest.builder().message("message").build())
  .via(SnsPublisher.publishFlow(topicArn))
  .runWith(Sink.foreach(res => println(res.messageId())))

Source
  .single(PublishRequest.builder().message("message").topicArn(topicArn).build())
  .via(SnsPublisher.publishFlow())
  .runWith(Sink.foreach(res => println(res.messageId())))
Java
sourceSource.single("message")
    .via(SnsPublisher.createFlow(topicArn, snsClient))
    .runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);

Source.single(PublishRequest.builder().message("message").build())
    .via(SnsPublisher.createPublishFlow(topicArn, snsClient))
    .runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);

Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
    .via(SnsPublisher.createPublishFlow(snsClient))
    .runWith(Sink.foreach(res -> System.out.println(res.messageId())), system);

As you can see, this would publish the messages from the source to the specified AWS SNS topic. After a message has been successfully published, a PublishResult will be pushed downstream.

Using a Sink

Scala
sourceSource
  .single("message")
  .runWith(SnsPublisher.sink(topicArn))

Source
  .single(PublishRequest.builder().message("message").build())
  .runWith(SnsPublisher.publishSink(topicArn))

Source
  .single(PublishRequest.builder().message("message").topicArn(topicArn).build())
  .runWith(SnsPublisher.publishSink())
Java
sourceSource.single("message").runWith(SnsPublisher.createSink(topicArn, snsClient), system);

Source.single(PublishRequest.builder().message("message").build())
    .runWith(SnsPublisher.createPublishSink(topicArn, snsClient), system);

Source.single(PublishRequest.builder().message("message").topicArn(topicArn).build())
    .runWith(SnsPublisher.createPublishSink(snsClient), system);

As you can see, this would publish the messages from the source to the specified AWS SNS topic.