AWS EventBridge
Amazon EventBridge is a serverless event bus that allows your applications to asynchronously consume events from 3rd party SaaS offerings, AWS services, and other applications in your own infrastructure. It evolved from Amazon CloudWatch Events (official documentation). The EventBridge acts as broker that you can configure with your own rules to route events to the correct service.
For more information about AWS EventBridge please visit the official documentation.
The publishing of the events is implemented using the AWS PUT Events API.
When publishing events any of the entries inside of the Put request can fail. The response contains information about which entries were not successfully published. Currently, there are no retries supported apart from the configuration provided to the EventBridge client.
Adding Support for configurable retry behaviour as part of the connector may be part of a future release.
By default the client will publish to a default event bus, but normally you should publish to a specific event bus that you create.
An event bus name is defined per event in a PutEventsRequestEntry object. It would be possible to define helper flows/sinks with default values such as source and eventBusName
. The detail
is JSON as a string and detailType
is the name of the event for rule matching.
The Apache Pekko Connectors AWS EventBridge connector provides Apache Pekko Stream flows and sinks to publish to AWS EventBridge event buses.
Project Info: Apache Pekko Connectors AWS EventBriged | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-aws-event-bridge
1.2.0+3-e195cec2-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.17, 2.12.20, 3.3.6 |
JPMS module name | pekko.stream.connectors.aws.eventbrigde |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-connectors-aws-event-bridge" % "1.2.0+3-e195cec2-SNAPSHOT"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-aws-event-bridge_${scala.binary.version}</artifactId> <version>1.2.0+3-e195cec2-SNAPSHOT</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-aws-event-bridge_${versions.ScalaBinary}:1.2.0+3-e195cec2-SNAPSHOT" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup
Prepare an ActorSystem
.
- Scala
-
source
implicit val system: ActorSystem = ActorSystem()
- Java
-
source
import org.apache.pekko.actor.ActorSystem; ActorSystem system = ActorSystem.create();
This connector requires an EventBridge
instance to communicate with AWS EventBridge.
It is your code’s responsibility to call close
to free any resources held by the client. In this example it will be called when the actor system is terminated.
- Scala
-
source
import java.net.URI import pekko.stream.connectors.awsspi.PekkoHttpClient import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient implicit val awsEventBridgeClient: EventBridgeAsyncClient = EventBridgeAsyncClient .builder() .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .endpointOverride(URI.create(endEndpoint)) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) .build() system.registerOnTermination(awsEventBridgeClient.close())
- Java
-
source
import java.net.URI; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient; final EventBridgeAsyncClient awsClient = EventBridgeAsyncClient.builder() .credentialsProvider( StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .endpointOverride(URI.create(endpoint)) .region(Region.EU_CENTRAL_1) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) .build(); system.registerOnTermination(() -> awsClient.close());
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration.
Publish messages to AWS EventBridge Event Bus
Create a PutEventsEntry
-accepting sink, publishing to an event bus.
- Scala
-
source
Source .single(PutEventsRequestEntry.builder().detail("string").build()) .runWith(EventBridgePublisher.sink())
- Java
-
source
Source.single(detailEntry("message")) .runWith(EventBridgePublisher.sink(eventBridgeClient), system);
Create a sink that accepts PutEventsRequestEntries
to be published to an Event Bus.
- Scala
-
source
Source .single(PutEventsRequest.builder().entries(PutEventsRequestEntry.builder().detail("string").build()).build()) .runWith(EventBridgePublisher.publishSink())
- Java
-
source
Source.single(detailPutEventsRequest("message")) .runWith(EventBridgePublisher.publishSink(eventBridgeClient), system);
You can also build flow stages which publish messages to Event Bus and then forward PutEventsResponse
further down the stream.
Flow for PutEventEntry
.
- Scala
-
source
Source .single(PutEventsRequestEntry.builder().detail("string").build()) .via(EventBridgePublisher.flow()) .runWith(Sink.foreach(res => println(res)))
- Java
-
source
Source.single(detailEntry("message")) .via(EventBridgePublisher.flow(eventBridgeClient)) .runWith(Sink.foreach(res -> System.out.println(res)), system);
Flow for PutEventsRequest
.
- Scala
-
source
Source .single(PutEventsRequest.builder().entries(PutEventsRequestEntry.builder().detail("string").build()).build()) .via(EventBridgePublisher.publishFlow()) .runWith(Sink.foreach(res => println(res)))
- Java
-
source
Source.single(detailPutEventsRequest("message")) .via(EventBridgePublisher.publishFlow(eventBridgeClient)) .runWith(Sink.foreach(res -> System.out.println(res)), system);
Flow supporting a list of PutEventEntry
objects.
Messages published in a batch using EventBridgePublisher.flowSeq
EventBridgePublisher.flowSeq
are not published in an “all or nothing” manner. Event Bridge will process each event independently. Retries of the failed messages in the PutEventsResponse
are not yet implemented.
Integration testing
For integration testing without connecting directly to Amazon EventBridge, Apache Pekko Connectors uses Localstack, which comes as a docker image - and has a corresponding service amazoneventbridge
in the docker-compose.yml
file. Which needs to be started before running the integration tests docker compose up amazoneventbridge
.