AWS S3

The AWS S3 connector provides Apache Pekko Stream sources and sinks to connect to Amazon S3. S3 stands for Simple Storage Service and is an object storage service with a web service interface.

Project Info: Apache Pekko Connectors Amazon S3
Artifact
org.apache.pekko
pekko-connectors-s3
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.aws.s3
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
val PekkoHttpVersion = "1.0.1"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-s3" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
  "org.apache.pekko" %% "pekko-http-xml" % PekkoHttpVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <pekko.http.version>1.0.1</pekko.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-s3_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http-xml_${scala.binary.version}</artifactId>
    <version>${pekko.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  PekkoHttpVersion: "1.0.1",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-s3_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
  implementation "org.apache.pekko:pekko-http-xml_${versions.ScalaBinary}:${versions.PekkoHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Configuration

The settings for the S3 connector are read by default from pekko.connectors.s3 configuration section. Credentials are loaded as described in the DefaultCredentialsProvider documentation. Therefore, if you are using Apache Pekko Connectors S3 connector in a standard environment, no configuration changes should be necessary. However, if you use a non-standard configuration path or need multiple different configurations, please refer to the attributes section below to see how to apply different configuration to different parts of the stream. All of the available configuration settings can be found in the reference.conf.

Store a file in S3

A file can be uploaded to S3 by creating a source of ByteStringByteString and running that with a sink created from S3.multipartUploadS3.multipartUpload.

Scala
sourceval file: Source[ByteString, NotUsed] =
  Source.single(ByteString(body))

val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
  S3.multipartUpload(bucket, bucketKey)

val result: Future[MultipartUploadResult] =
  file.runWith(s3Sink)
Java
sourcefinal Source<ByteString, NotUsed> file = Source.single(ByteString.fromString(body()));

final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink =
    S3.multipartUpload(bucket(), bucketKey());

final CompletionStage<MultipartUploadResult> resultCompletionStage = file.runWith(sink, system);

Download a file from S3

A source for downloading a file can be created by calling S3.downloadS3.download. It will emit an OptionOptional that will hold file’s data and metadata or will be empty if no such file can be found.

Scala
sourceval s3Source: Source[ByteString, Future[ObjectMetadata]] =
  S3.getObject(bucket, bucketKey)

val (metadataFuture, dataFuture) =
  s3Source.toMat(Sink.head)(Keep.both).run()
Java
sourcefinal Source<ByteString, CompletionStage<ObjectMetadata>> s3Source =
    S3.getObject(bucket(), bucketKey());
final Pair<CompletionStage<ObjectMetadata>, CompletionStage<String>> dataAndMetadata =
    s3Source.map(ByteString::utf8String).toMat(Sink.head(), Keep.both()).run(system);

final CompletionStage<ObjectMetadata> metadataCompletionStage = dataAndMetadata.first();
final CompletionStage<String> dataCompletionStage = dataAndMetadata.second();

In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange as argument.

Scala
sourceval s3Source = S3.getObject(bucket, bucketKey, Some(ByteRange(bytesRangeStart, bytesRangeEnd)))
Java
sourcefinal Source<ByteString, CompletionStage<ObjectMetadata>> sourceAndMeta =
    S3.getObject(
        bucket(), bucketKey(), ByteRange.createSlice(bytesRangeStart(), bytesRangeEnd()));

File metadata (ObjectMetadataObjectMetadata) holds content type, size and other useful information about the object. Here’s an example of using this metadata to stream an object back to a client in Apache Pekko Http.

Scala
sourceHttpResponse(
  entity = HttpEntity(
    metadata.contentType
      .flatMap(ContentType.parse(_).toOption)
      .getOrElse(ContentTypes.`application/octet-stream`),
    metadata.contentLength,
    s3Source))
Java
sourcemetadataCompletionStage.thenApply(
    metadata ->
        HttpResponse.create()
            .withEntity(
                HttpEntities.create(
                    metadata
                        .getContentType()
                        .map(ContentTypes::parse)
                        .orElse(ContentTypes.APPLICATION_OCTET_STREAM),
                    metadata.getContentLength(),
                    s3Source)));

Access object metadata without downloading object from S3

If you do not need the object itself, you can query for only object metadata using a source from S3.getObjectMetadataS3.getObjectMetadata.

Scala
sourceval metadata: Source[Option[ObjectMetadata], NotUsed] =
  S3.getObjectMetadata(bucket, bucketKey)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
    S3.getObjectMetadata(bucket(), bucketKey());

List bucket contents

To get a list of all objects in a bucket, use S3.listBucketS3.listBucket. When run, this will give a stream of ListBucketResultContents.

Scala
sourceval keySource: Source[ListBucketResultContents, NotUsed] =
  S3.listBucket(bucket, Some(listPrefix))
Java
sourcefinal Source<ListBucketResultContents, NotUsed> keySource =
    S3.listBucket(bucket(), Optional.of(prefix));

List bucket contents and common prefixes

To get a list of the contents and common prefixes for one hierarchy level using a delimiter, use S3.listBucketAndCommonPrefixesS3.listBucketAndCommonPrefixes. When run, this will give a tuple stream of (Seq[ListBucketResultContentsListBucketResultContents], Seq[ListBucketResultCommonPrefixesListBucketResultCommonPrefixes]).

Scala
sourceval keyAndCommonPrefixSource
    : Source[(Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes]), NotUsed] =
  S3.listBucketAndCommonPrefixes(bucket, listDelimiter, Some(listPrefix))
Java
sourcefinal Source<
        Pair<List<ListBucketResultContents>, List<ListBucketResultCommonPrefixes>>, NotUsed>
    keySource =
        S3.listBucketAndCommonPrefixes(
            bucket(), delimiter, Optional.of(prefix), S3Headers.empty());

Copy upload (multi part)

Copy an S3 object from source bucket to target bucket using S3.multipartCopyS3.multipartCopy. When run, this will emit a single MultipartUploadResult with the information about the copied object.

Scala
sourceval result: Future[MultipartUploadResult] =
  S3.multipartCopy(bucket, bucketKey, targetBucket, targetBucketKey).run()
Java
sourcefinal CompletionStage<MultipartUploadResult> resultCompletionStage =
    S3.multipartCopy(bucket, sourceKey, targetBucket, targetKey).run(system);

If your bucket has versioning enabled, you could have multiple versions of the same object. By default AWS identifies the current version of the object to copy. You can optionally specify a specific version of the source object to copy by adding the sourceVersionId parameter.

Scala
sourceval result: Future[MultipartUploadResult] =
  S3.multipartCopy(bucket,
    bucketKey,
    targetBucket,
    targetBucketKey,
    sourceVersionId = Some("3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo"))
    .run()
Java
sourceString sourceVersionId = "3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo";
final CompletionStage<MultipartUploadResult> resultCompletionStage =
    S3.multipartCopy(
            bucket,
            sourceKey,
            targetBucket,
            targetKey,
            Optional.of(sourceVersionId),
            S3Headers.create())
        .run(system);

Different options are available for server side encryption in the ServerSideEncryption factory.

Scala
sourceval keys = ServerSideEncryption
  .customerKeys(sseCustomerKey)
  .withMd5(sseCustomerMd5Key)

val result: Future[MultipartUploadResult] =
  S3.multipartCopy(bucket,
    bucketKey,
    targetBucket,
    targetBucketKey,
    s3Headers = S3Headers().withServerSideEncryption(keys))
    .run()
Java
sourcefinal CustomerKeys keys =
    ServerSideEncryption.customerKeys(sseCustomerKey()).withMd5(sseCustomerMd5Key());

final CompletionStage<MultipartUploadResult> resultCompletionStage =
    S3.multipartCopy(
            bucket(),
            bucketKey(),
            targetBucket(),
            targetBucketKey(),
            S3Headers.create().withServerSideEncryption(keys))
        .run(system);

More S3 specific headers and arbitrary HTTP headers can be specified by adding to the S3Headers container.

Apply S3 settings to a part of the stream

It is possible to make one part of the stream use different S3SettingsS3Settings from the rest of the graph. This can be useful, when one stream is used to copy files across regions or even different S3 compatible endpoints. You can attach a custom S3Settings instance or a custom config path to a graph using attributes from S3AttributesS3Attributes:

Scala
sourceval useVersion1Api = S3Ext(system).settings
  .withListBucketApiVersion(ApiVersion.ListBucketVersion1)

val keySource: Source[ListBucketResultContents, NotUsed] =
  S3.listBucket(bucket, Some(listPrefix))
    .withAttributes(S3Attributes.settings(useVersion1Api))
Java
sourcefinal S3Settings useVersion1Api =
    S3Ext.get(system()).settings().withListBucketApiVersion(ApiVersion.getListBucketVersion1());

final Source<ListBucketResultContents, NotUsed> keySource =
    S3.listBucket(bucket(), Optional.of(prefix))
        .withAttributes(S3Attributes.settings(useVersion1Api));

Bucket management

Bucket management API provides functionality for both Sources and Futures / CompletionStages. In case of the Future API user can specify attributes to the request in the method itself and as for Sources it can be done via method .withAttributes. For more information about attributes see: S3AttributesS3Attributes and AttributesAttributes

Make bucket

In order to create a bucket in S3 you need to specify its unique name. This value has to be set accordingly to the requirements. The bucket will be created in the region specified in the settings.

Scala
sourceval bucketName = "samplebucket1"

implicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings)

val makeBucketRequest: Future[Done] = S3.makeBucket(bucketName)
val makeBucketSourceRequest: Source[Done, NotUsed] = S3.makeBucketSource(bucketName)
Java
sourcefinal Attributes sampleAttributes = S3Attributes.settings(sampleSettings);

final String bucketName = "samplebucket1";

CompletionStage<Done> makeBucketRequest = S3.makeBucket(bucketName, system);
CompletionStage<Done> makeBucketRequestWithAttributes =
    S3.makeBucket(bucketName, system, sampleAttributes);
Source<Done, NotUsed> makeBucketSourceRequest = S3.makeBucketSource(bucketName);

Delete bucket

To delete a bucket you need to specify its name and the bucket needs to be empty.

Scala
sourceimplicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings)

val deleteBucketRequest: Future[Done] = S3.deleteBucket(bucketName)
val deleteBucketSourceRequest: Source[Done, NotUsed] = S3.deleteBucketSource(bucketName)
Java
sourcefinal Attributes sampleAttributes = S3Attributes.settings(sampleSettings);

CompletionStage<Done> deleteBucketRequest = S3.deleteBucket(bucketName, system);
CompletionStage<Done> deleteBucketRequestWithAttribues =
    S3.deleteBucket(bucketName, system, sampleAttributes);

Source<Done, NotUsed> deleteBucketSourceRequest = S3.deleteBucketSource(bucketName);

Check if bucket exists

It is possible to check if a bucket exists and the user has rights to perform a listBucket operation.

There are 3 possible outcomes:

  • The user has access to the existing bucket, then it will return AccessGranted
  • The user doesn’t have access but the bucket exists so AccessDenied will be returned
  • The bucket doesn’t exist, the method will return NotExists
Scala
sourceimplicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings)

val doesntExistRequest: Future[BucketAccess] = S3.checkIfBucketExists(bucket)
val doesntExistSourceRequest: Source[BucketAccess, NotUsed] = S3.checkIfBucketExistsSource(bucket)
Java
sourcefinal Attributes sampleAttributes = S3Attributes.settings(sampleSettings);

final CompletionStage<BucketAccess> doesntExistRequest =
    S3.checkIfBucketExists(bucket(), system);
final CompletionStage<BucketAccess> doesntExistRequestWithAttributes =
    S3.checkIfBucketExists(bucket(), system, sampleAttributes);

final Source<BucketAccess, NotUsed> doesntExistSourceRequest =
    S3.checkIfBucketExistsSource(bucket());

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> s3/test
Java
sbt
> s3/test

Some test code requires docker to be installed and running. Please read either the official instructions or refer to your Linux distro.