AWS Lambda

The AWS Lambda connector provides Apache Pekko Flow for AWS Lambda integration.

For more information about AWS Lambda please visit the AWS lambda documentation.

Project Info: Apache Pekko Connectors AWS Lambda
Artifact
org.apache.pekko
pekko-connectors-awslambda
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.13, 2.12.19, 3.3.3
JPMS module namepekko.stream.connectors.aws.lambda
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.2"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-awslambda" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.2</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-awslambda_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.2",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-awslambda_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Setup

The flow provided by this connector needs a prepared LambdaAsyncClient to be able to invoke lambda functions.

Scala
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.services.lambda.LambdaAsyncClient

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val lambdaClient: LambdaAsyncClient = LambdaAsyncClient
  .builder()
  .credentialsProvider(credentialsProvider)
  .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
  // Possibility to configure the retry policy
  // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
  // .overrideConfiguration(...)
  .build()

system.registerOnTermination(lambdaClient.close())
Java
sourceimport com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;

// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
    StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
LambdaAsyncClient awsLambdaClient =
    LambdaAsyncClient.builder()
        .credentialsProvider(credentialsProvider)
        .httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(awsLambdaClient::close);

The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.

We will need an ActorSystemActorSystem.

Scala
sourceimport org.apache.pekko.actor.ActorSystem

implicit val system: ActorSystem = ActorSystem()
Java
sourceimport org.apache.pekko.actor.ActorSystem;

ActorSystem system = ActorSystem.create();

This is all preparation that we are going to need.

Sending messages

Now we can stream AWS Java SDK Lambda InvokeRequest to AWS Lambda functions AwsLambdaFlowAwsLambdaFlow factory.

Scala
sourceimport software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.lambda.model.InvokeRequest

val request = InvokeRequest
  .builder()
  .functionName("lambda-function-name")
  .payload(SdkBytes.fromUtf8String("test-payload"))
  .build()
Source.single(request).via(AwsLambdaFlow(1)).runWith(Sink.seq)
Java
sourceimport software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.core.SdkBytes;

InvokeRequest request =
    InvokeRequest.builder()
        .functionName("lambda-function-name")
        .payload(SdkBytes.fromUtf8String("test-payload"))
        .build();
Flow<InvokeRequest, InvokeResponse, NotUsed> flow = AwsLambdaFlow.create(awsLambdaClient, 1);
final CompletionStage<List<InvokeResponse>> stage =
    Source.single(request).via(flow).runWith(Sink.seq(), system);

AwsLambdaFlow configuration

Options:

  • parallelism - Number of parallel executions. Should be less or equal to number of threads in ExecutorService for LambdaAsyncClient