AWS EventBridge

Amazon EventBridge

Amazon EventBridge is a serverless event bus that allows your applications to asynchronously consume events from 3rd party SaaS offerings, AWS services, and other applications in your own infrastructure. It evolved from Amazon CloudWatch Events (official documentation). The EventBridge acts as broker that you can configure with your own rules to route events to the correct service.

For more information about AWS EventBridge please visit the official documentation.

The publishing of the events is implemented using the AWS PUT Events API.

When publishing events any of the entries inside of the Put request can fail. The response contains information about which entries were not successfully published. Currently, there are no retries supported apart from the configuration provided to the EventBridge client.

Adding Support for configurable retry behaviour as part of the connector may be part of a future release.

By default the client will publish to a default event bus, but normally you should publish to a specific event bus that you create.

An event bus name is defined per event in a PutEventsRequestEntry object. It would be possible to define helper flows/sinks with default values such as source and eventBusName. The detail is JSON as a string and detailType is the name of the event for rule matching.

The Apache Pekko Connectors AWS EventBridge connector provides Apache Pekko Stream flows and sinks to publish to AWS EventBridge event buses.

Project Info: Apache Pekko Connectors AWS EventBriged
Artifact
org.apache.pekko
pekko-connectors-aws-event-bridge
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.13, 2.12.19, 3.3.3
JPMS module namepekko.stream.connectors.aws.eventbrigde
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-connectors-aws-event-bridge" % "1.0.2"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-aws-event-bridge_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-aws-event-bridge_${versions.ScalaBinary}:1.0.2"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Setup

Prepare an ActorSystem.

Scala
sourceimplicit val system: ActorSystem = ActorSystem()
Java
sourceimport org.apache.pekko.actor.ActorSystem;
ActorSystem system = ActorSystem.create();

This connector requires an EventBridge instance to communicate with AWS EventBridge.

It is your code’s responsibility to call close to free any resources held by the client. In this example it will be called when the actor system is terminated.

Scala
sourceimport java.net.URI

import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient

implicit val awsEventBridgeClient: EventBridgeAsyncClient =
  EventBridgeAsyncClient
    .builder()
    .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
    .endpointOverride(URI.create(endEndpoint))
    .region(Region.EU_CENTRAL_1)
    .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
    .build()

system.registerOnTermination(awsEventBridgeClient.close())
Java
sourceimport java.net.URI;
import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient;

final EventBridgeAsyncClient awsClient =
    EventBridgeAsyncClient.builder()
        .credentialsProvider(
            StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
        .endpointOverride(URI.create(endpoint))
        .region(Region.EU_CENTRAL_1)
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        .build();

system.registerOnTermination(() -> awsClient.close());

The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration.

Publish messages to AWS EventBridge Event Bus

Create a PutEventsEntry-accepting sink, publishing to an event bus.

Scala
sourceSource
  .single(PutEventsRequestEntry.builder().detail("string").build())
  .runWith(EventBridgePublisher.sink())
Java
sourceSource.single(detailEntry("message"))
    .runWith(EventBridgePublisher.sink(eventBridgeClient), system);

Create a sink that accepts PutEventsRequestEntries to be published to an Event Bus.

Scala
sourceSource
  .single(PutEventsRequest.builder().entries(PutEventsRequestEntry.builder().detail("string").build()).build())
  .runWith(EventBridgePublisher.publishSink())
Java
sourceSource.single(detailPutEventsRequest("message"))
    .runWith(EventBridgePublisher.publishSink(eventBridgeClient), system);

You can also build flow stages which publish messages to Event Bus and then forward PutEventsResponse further down the stream.

Flow for PutEventEntry.

Scala
sourceSource
  .single(PutEventsRequestEntry.builder().detail("string").build())
  .via(EventBridgePublisher.flow())
  .runWith(Sink.foreach(res => println(res)))
Java
sourceSource.single(detailEntry("message"))
    .via(EventBridgePublisher.flow(eventBridgeClient))
    .runWith(Sink.foreach(res -> System.out.println(res)), system);

Flow for PutEventsRequest.

Scala
sourceSource
  .single(PutEventsRequest.builder().entries(PutEventsRequestEntry.builder().detail("string").build()).build())
  .via(EventBridgePublisher.publishFlow())
  .runWith(Sink.foreach(res => println(res)))
Java
sourceSource.single(detailPutEventsRequest("message"))
    .via(EventBridgePublisher.publishFlow(eventBridgeClient))
    .runWith(Sink.foreach(res -> System.out.println(res)), system);

Flow supporting a list of PutEventEntry objects.

Messages published in a batch using EventBridgePublisher.flowSeqEventBridgePublisher.flowSeq are not published in an “all or nothing” manner. Event Bridge will process each event independently. Retries of the failed messages in the PutEventsResponse are not yet implemented.

Integration testing

For integration testing without connecting directly to Amazon EventBridge, Apache Pekko Connectors uses Localstack, which comes as a docker image - and has a corresponding service amazoneventbridge in the docker-compose.yml file. Which needs to be started before running the integration tests docker-compose up amazoneventbridge.