AWS Lambda
The AWS Lambda connector provides Apache Pekko Flow for AWS Lambda integration.
For more information about AWS Lambda please visit the AWS lambda documentation.
Project Info: Apache Pekko Connectors AWS Lambda | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-awslambda
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.aws.lambda |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-awslambda" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-awslambda_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-awslambda_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup¶
The flow provided by this connector needs a prepared LambdaAsyncClient
to be able to invoke lambda functions.
sourceimport pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.services.lambda.LambdaAsyncClient
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val lambdaClient: LambdaAsyncClient = LambdaAsyncClient
.builder()
.credentialsProvider(credentialsProvider)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build()
system.registerOnTermination(lambdaClient.close())
sourceimport org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
// Don't encode credentials in your source code!
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
LambdaAsyncClient awsLambdaClient =
LambdaAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possibility to configure the retry policy
// see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build();
system.registerOnTermination(awsLambdaClient::close);
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.
We will need an ActorSystem
.
sourceimport org.apache.pekko.actor.ActorSystem
implicit val system: ActorSystem = ActorSystem()
sourceimport org.apache.pekko.actor.ActorSystem;
ActorSystem system = ActorSystem.create();
This is all preparation that we are going to need.
Sending messages¶
Now we can stream AWS Java SDK Lambda InvokeRequest
to AWS Lambda functions AwsLambdaFlow
factory.
sourceimport software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.lambda.model.InvokeRequest
val request = InvokeRequest
.builder()
.functionName("lambda-function-name")
.payload(SdkBytes.fromUtf8String("test-payload"))
.build()
Source.single(request).via(AwsLambdaFlow(1)).runWith(Sink.seq)
sourceimport software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.core.SdkBytes;
InvokeRequest request =
InvokeRequest.builder()
.functionName("lambda-function-name")
.payload(SdkBytes.fromUtf8String("test-payload"))
.build();
Flow<InvokeRequest, InvokeResponse, NotUsed> flow = AwsLambdaFlow.create(awsLambdaClient, 1);
final CompletionStage<List<InvokeResponse>> stage =
Source.single(request).via(flow).runWith(Sink.seq(), system);
AwsLambdaFlow configuration¶
Options:
parallelism
- Number of parallel executions. Should be less or equal to number of threads in ExecutorService for LambdaAsyncClient