AWS Lambda

The AWS Lambda connector provides Apache Pekko Flow for AWS Lambda integration.

For more information about AWS Lambda please visit the AWS lambda documentation.

Project Info: Apache Pekko Connectors AWS Lambda
Artifact
org.apache.pekko
pekko-connectors-awslambda
1.1.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.aws.lambda
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
Maven
Gradle
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-awslambda" % "1.1.0",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
  <pekko.version>1.1.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-awslambda_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
def versions = [
  PekkoVersion: "1.1.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-awslambda_${versions.ScalaBinary}:1.1.0"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Setup

The flow provided by this connector needs a prepared LambdaAsyncClient to be able to invoke lambda functions.

Scala
Java
sourceimport pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.services.lambda.LambdaAsyncClient

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val lambdaClient: LambdaAsyncClient = LambdaAsyncClient
  .builder()
  .credentialsProvider(credentialsProvider)
  .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
  // Possibility to configure the retry policy
  // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
  // .overrideConfiguration(...)
  .build()

system.registerOnTermination(lambdaClient.close())
sourceimport org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
    StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
LambdaAsyncClient awsLambdaClient =
    LambdaAsyncClient.builder()
        .credentialsProvider(credentialsProvider)
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(awsLambdaClient::close);

The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.

We will need an ActorSystem.

Scala
Java
sourceimport org.apache.pekko.actor.ActorSystem

implicit val system: ActorSystem = ActorSystem()
sourceimport org.apache.pekko.actor.ActorSystem;

ActorSystem system = ActorSystem.create();

This is all preparation that we are going to need.

Sending messages

Now we can stream AWS Java SDK Lambda InvokeRequest to AWS Lambda functions AwsLambdaFlow factory.

Scala
Java
sourceimport software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.lambda.model.InvokeRequest

val request = InvokeRequest
  .builder()
  .functionName("lambda-function-name")
  .payload(SdkBytes.fromUtf8String("test-payload"))
  .build()
Source.single(request).via(AwsLambdaFlow(1)).runWith(Sink.seq)
sourceimport software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.core.SdkBytes;

InvokeRequest request =
    InvokeRequest.builder()
        .functionName("lambda-function-name")
        .payload(SdkBytes.fromUtf8String("test-payload"))
        .build();
Flow<InvokeRequest, InvokeResponse, NotUsed> flow = AwsLambdaFlow.create(awsLambdaClient, 1);
final CompletionStage<List<InvokeResponse>> stage =
    Source.single(request).via(flow).runWith(Sink.seq(), system);

AwsLambdaFlow configuration

Options:

  • parallelism - Number of parallel executions. Should be less or equal to number of threads in ExecutorService for LambdaAsyncClient