AWS Lambda
The AWS Lambda connector provides Apache Pekko Flow for AWS Lambda integration.
For more information about AWS Lambda please visit the AWS lambda documentation.
Project Info: Apache Pekko Connectors AWS Lambda | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-awslambda
1.0.2
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.14, 2.12.20, 3.3.3 |
JPMS module name | pekko.stream.connectors.aws.lambda |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.0.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-awslambda" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.0.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-awslambda_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.0.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-awslambda_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup
The flow provided by this connector needs a prepared LambdaAsyncClient
to be able to invoke lambda functions.
- Scala
-
source
import com.github.pjfanning.pekkohttpspi.PekkoHttpClient import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.services.lambda.LambdaAsyncClient // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")) implicit val lambdaClient: LambdaAsyncClient = LambdaAsyncClient .builder() .credentialsProvider(credentialsProvider) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build() system.registerOnTermination(lambdaClient.close())
- Java
-
source
import com.github.pjfanning.pekkohttpspi.PekkoHttpClient; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.services.lambda.LambdaAsyncClient; // Don't encode credentials in your source code! // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")); LambdaAsyncClient awsLambdaClient = LambdaAsyncClient.builder() .credentialsProvider(credentialsProvider) .httpClient(PekkoHttpClient.builder().withActorSystem(system).build()) // Possibility to configure the retry policy // see https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html // .overrideConfiguration(...) .build(); system.registerOnTermination(awsLambdaClient::close);
The example above uses Apache Pekko HTTP as the default HTTP client implementation. For more details about the HTTP client, configuring request retrying and best practices for credentials, see AWS client configuration for more details.
We will need an ActorSystem
ActorSystem
.
- Scala
-
source
import org.apache.pekko.actor.ActorSystem implicit val system: ActorSystem = ActorSystem()
- Java
-
source
import org.apache.pekko.actor.ActorSystem; ActorSystem system = ActorSystem.create();
This is all preparation that we are going to need.
Sending messages
Now we can stream AWS Java SDK Lambda InvokeRequest
to AWS Lambda functions AwsLambdaFlow
AwsLambdaFlow
factory.
- Scala
-
source
import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.lambda.model.InvokeRequest val request = InvokeRequest .builder() .functionName("lambda-function-name") .payload(SdkBytes.fromUtf8String("test-payload")) .build() Source.single(request).via(AwsLambdaFlow(1)).runWith(Sink.seq)
- Java
-
source
import software.amazon.awssdk.services.lambda.model.InvokeRequest; import software.amazon.awssdk.services.lambda.model.InvokeResponse; import software.amazon.awssdk.core.SdkBytes; InvokeRequest request = InvokeRequest.builder() .functionName("lambda-function-name") .payload(SdkBytes.fromUtf8String("test-payload")) .build(); Flow<InvokeRequest, InvokeResponse, NotUsed> flow = AwsLambdaFlow.create(awsLambdaClient, 1); final CompletionStage<List<InvokeResponse>> stage = Source.single(request).via(flow).runWith(Sink.seq(), system);
AwsLambdaFlow configuration
Options:
parallelism
- Number of parallel executions. Should be less or equal to number of threads in ExecutorService for LambdaAsyncClient