File

The File connectors provide additional connectors for filesystems complementing the sources and sinks for files already included in core Apache Pekko Streams (which can be found in ).

Project Info: Apache Pekko Connectors File
Artifact
org.apache.pekko
pekko-connectors-file
1.1.0-M1+129-4a175fef-SNAPSHOT
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.file
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.1.2"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-file" % "1.1.0-M1+129-4a175fef-SNAPSHOT",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.1.2</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-file_${scala.binary.version}</artifactId>
    <version>1.1.0-M1+129-4a175fef-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.1.2",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-file_${versions.ScalaBinary}:1.1.0-M1+129-4a175fef-SNAPSHOT"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Writing to and reading from files

Use the FileIO class to create streams reading from or writing to files. It is part part of Apache Pekko Streams.

Apache Pekko Streaming File IO documentation

Tailing a file into a stream

The FileTailSource starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).

A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource contains a few factory methods to create a source that parses the bytes into lines and emits those.

In this sample we simply tail the lines of a file and print them to standard out:

Scala
sourceimport org.apache.pekko.stream.connectors.file.scaladsl.FileTailSource

val fs = FileSystems.getDefault
val lines: Source[String, NotUsed] = FileTailSource.lines(
  path = fs.getPath(path),
  maxLineSize = 8192,
  pollingInterval = 250.millis)

lines.runForeach(line => System.out.println(line))
Java
sourcefinal FileSystem fs = FileSystems.getDefault();
final Duration pollingInterval = Duration.ofMillis(250);
final int maxLineSize = 8192;

final Source<String, NotUsed> lines =
    org.apache.pekko.stream.connectors.file.javadsl.FileTailSource.createLines(
        fs.getPath(path), maxLineSize, pollingInterval);

lines.runForeach(System.out::println, system);

Shutdown stream when file is deleted

The FileTailSource stream will not shutdown or throw an error when the file it is tailing is deleted from the filesystem. If you would like to shutdown the stream, or throw an error, you can do so by merging in a DirectoryChangesSourceDirectoryChangesSource that listens to filesystem events in the directory that contains the file.

In the following example, a DirectoryChangesSource is used to watch for events in a directory. If a file delete event is observed for the file we are tailing then we shutdown the stream gracefully by using a Flow.recoverWithRetriesFlow.recoverWithRetries to switch to a Source.emptySource.empty, which with immediately send an OnComplete signal and shutdown the stream.

Scala
source
val checkInterval = 1.second val fileCheckSource = DirectoryChangesSource(path.getParent, checkInterval, 8192) .collect { case (p, DirectoryChange.Deletion) if path == p => throw new FileNotFoundException(path.toString) } .recoverWithRetries(1, { case _: FileNotFoundException => Source.empty }) val stream = FileTailSource .lines(path = path, maxLineSize = 8192, pollingInterval = 250.millis) .merge(fileCheckSource, eagerComplete = true)
Java
source
final Duration checkInterval = Duration.ofSeconds(1); final Source<String, NotUsed> fileCheckSource = org.apache.pekko.stream.connectors.file.javadsl.DirectoryChangesSource.create( path.getParent(), checkInterval, 8192) .mapConcat( pair -> { if (pair.first().equals(path) && pair.second() == DirectoryChange.Deletion) { throw new FileNotFoundException(); } return Collections.<String>emptyList(); }) .recoverWithRetries( -1, new PFBuilder<Throwable, Source<String, NotUsed>>() .match(FileNotFoundException.class, t -> Source.empty()) .build()); final Source<String, NotUsed> source = org.apache.pekko.stream.connectors.file.javadsl.FileTailSource.createLines( path, 8192, // chunk size Duration.ofMillis(250)) .merge(fileCheckSource, true);
Stream Shutdown Race Condition

Since the DirectoryChangesSource and the FileTailSource operate asynchronously as separate sources there is the possibility that the stream could be shutdown prematurely. If the file is detected as deleted and the stream is shutdown before the last element is emitted from FileTailSource, then that data will never be available to downstream user stages.

Shutdown stream after an idle timeout

It may be useful to shutdown the stream when no new data has been added for awhile to a file being tailed by FileTailSource. In the following example, a Flow.idleTimeoutFlow.idleTimeout operator is used to trigger a TimeoutException that can be recovered with Flow.recoverWithRetriesFlow.recoverWithRetries and a Source.emptySource.empty to successfully shutdown the stream.

Scala
source
val stream = FileTailSource .lines(path = path, maxLineSize = 8192, pollingInterval = 250.millis) .idleTimeout(5.seconds) .recoverWithRetries(1, { case _: TimeoutException => Source.empty })
Java
source
Source<String, NotUsed> stream = org.apache.pekko.stream.connectors.file.javadsl.FileTailSource.createLines( path, 8192, // chunk size Duration.ofMillis(250)) .idleTimeout(Duration.ofSeconds(5)) .recoverWithRetries( -1, new PFBuilder<Throwable, Source<String, NotUsed>>() .match(TimeoutException.class, t -> Source.empty()) .build());

Creating directories

Directory.mkdirs() and Directory.mkdirsWithContext() create directories for Path elements in the stream. The withContext-variant allows the user to pass through additional information with every path.

Scala
sourceimport org.apache.pekko.stream.connectors.file.scaladsl.Directory

val flow: Flow[Path, Path, NotUsed] = Directory.mkdirs()

val created: Future[immutable.Seq[Path]] =
  Source(immutable.Seq(dir.resolve("dirA"), dir.resolve("dirB")))
    .via(flow)
    .runWith(Sink.seq)

val flowWithContext: FlowWithContext[Path, SomeContext, Path, SomeContext, NotUsed] =
  Directory.mkdirsWithContext[SomeContext]()
Java
sourceFlow<Path, Path, NotUsed> flow = Directory.mkdirs();

CompletionStage<List<Path>> created =
    Source.from(Arrays.asList(dir.resolve("dirA"), dir.resolve("dirB")))
        .via(flow)
        .runWith(Sink.seq(), system);

FlowWithContext<Path, SomeContext, Path, SomeContext, NotUsed> flowWithContext =
    Directory.mkdirsWithContext();

Listing directory contents

Directory.ls(path) lists all files and directories directly in a given directory:

Scala
sourceimport org.apache.pekko.stream.connectors.file.scaladsl.Directory

val source: Source[Path, NotUsed] = Directory.ls(dir)
Java
sourceimport org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.connectors.file.javadsl.Directory;

final Source<Path, NotUsed> source = Directory.ls(dir);

Directory.walk(path) traverses all subdirectories and lists files and directories depth first:

Scala
sourceimport org.apache.pekko.stream.connectors.file.scaladsl.Directory
import java.nio.file.FileVisitOption

val files: Source[Path, NotUsed] = Directory.walk(root)

val files2: Source[Path, NotUsed] = Directory.walk(root, maxDepth = Some(1), List(FileVisitOption.FOLLOW_LINKS))
Java
sourceimport org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.connectors.file.javadsl.Directory;
import java.nio.file.FileVisitOption;

final Source<Path, NotUsed> source = Directory.walk(root);

final Source<Path, NotUsed> source = Directory.walk(root, 1, FileVisitOption.FOLLOW_LINKS);

Listening to changes in a directory

The DirectoryChangesSource will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.

In this sample we simply print each change to the directory to standard output:

Scala
sourceimport org.apache.pekko.stream.connectors.file.scaladsl.DirectoryChangesSource

val fs = FileSystems.getDefault
val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000)
changes.runForeach {
  case (path, change) => println("Path: " + path + ", Change: " + change)
}
Java
sourceimport org.apache.pekko.stream.connectors.file.javadsl.DirectoryChangesSource;

final FileSystem fs = FileSystems.getDefault();
final Duration pollingInterval = Duration.ofSeconds(1);
final int maxBufferSize = 1000;
final Source<Pair<Path, DirectoryChange>, NotUsed> changes =
    DirectoryChangesSource.create(fs.getPath(path), pollingInterval, maxBufferSize);

changes.runForeach(
    (Pair<Path, DirectoryChange> pair) -> {
      final Path changedPath = pair.first();
      final DirectoryChange change = pair.second();
      System.out.println("Path: " + changedPath + ", Change: " + change);
    },
    system);

Rotating the file to stream into

The LogRotatorSinkLogRotatorSink will create and write to multiple files.
This sink takes a creator as parameter which returns a Bytestring => Option[Path] functionFunction<ByteString, Optional<Path>>. If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.

This example usage shows the built-in target file creation and a custom sink factory which is required to use CompressionCompression for the target files.

Scala
sourceval triggerFunctionCreator: () => ByteString => Option[Path] = ???

val completion = Source(immutable.Seq("test1", "test2", "test3", "test4", "test5", "test6"))
  .map(ByteString(_))
  .runWith(LogRotatorSink(triggerFunctionCreator))

// GZip compressing the data written
val completion =
  source
    .runWith(
      LogRotatorSink.withSinkFactory(
        triggerFunctionCreator,
        (path: Path) =>
          Flow[ByteString]
            .via(Compression.gzip)
            .toMat(FileIO.toPath(path))(Keep.right)))
Java
sourceimport org.apache.pekko.stream.connectors.file.javadsl.LogRotatorSink;

Creator<Function<ByteString, Optional<Path>>> triggerFunctionCreator = ...;

CompletionStage<Done> completion =
    Source.from(Arrays.asList("test1", "test2", "test3", "test4", "test5", "test6"))
        .map(ByteString::fromString)
        .runWith(LogRotatorSink.createFromFunction(triggerFunctionCreator), system);

// GZip compressing the data written
CompletionStage<Done> compressedCompletion =
    source.runWith(
        LogRotatorSink.withSinkFactory(
            triggerFunctionCreator,
            path ->
                Flow.of(ByteString.class)
                    .via(Compression.gzip())
                    .toMat(FileIO.toPath(path), Keep.right())),
        system);

Example: size-based rotation

Scala
sourceimport org.apache.pekko.stream.connectors.file.scaladsl.LogRotatorSink

val fileSizeTriggerCreator: () => ByteString => Option[Path] = () => {
  val max = 10 * 1024 * 1024
  var size: Long = max
  (element: ByteString) =>
    if (size + element.size > max) {
      val path = Files.createTempFile("out-", ".log")
      size = element.size
      Some(path)
    } else {
      size += element.size
      None
    }
}

val sizeRotatorSink: Sink[ByteString, Future[Done]] =
  LogRotatorSink(fileSizeTriggerCreator)
Java
sourceCreator<Function<ByteString, Optional<Path>>> sizeBasedTriggerCreator =
    () -> {
      long max = 10 * 1024 * 1024;
      final long[] size = new long[] {max};
      return (element) -> {
        if (size[0] + element.size() > max) {
          Path path = Files.createTempFile("out-", ".log");
          size[0] = element.size();
          return Optional.of(path);
        } else {
          size[0] += element.size();
          return Optional.empty();
        }
      };
    };

Sink<ByteString, CompletionStage<Done>> sizeRotatorSink =
    LogRotatorSink.createFromFunction(sizeBasedTriggerCreator);

Example: time-based rotation

Scala
sourceval destinationDir = FileSystems.getDefault.getPath("/tmp")
val formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'")

val timeBasedTriggerCreator: () => ByteString => Option[Path] = () => {
  var currentFilename: Option[String] = None
  (_: ByteString) => {
    val newName = LocalDateTime.now().format(formatter)
    if (currentFilename.contains(newName)) {
      None
    } else {
      currentFilename = Some(newName)
      Some(destinationDir.resolve(newName))
    }
  }
}

val timeBasedSink: Sink[ByteString, Future[Done]] =
  LogRotatorSink(timeBasedTriggerCreator)
Java
sourcefinal Path destinationDir = FileSystems.getDefault().getPath("/tmp");
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'");

Creator<Function<ByteString, Optional<Path>>> timeBasedTriggerCreator =
    () -> {
      final String[] currentFileName = new String[] {null};
      return (element) -> {
        String newName = LocalDateTime.now().format(formatter);
        if (newName.equals(currentFileName[0])) {
          return Optional.empty();
        } else {
          currentFileName[0] = newName;
          return Optional.of(destinationDir.resolve(newName));
        }
      };
    };

Sink<ByteString, CompletionStage<Done>> timeBasedSink =
    LogRotatorSink.createFromFunction(timeBasedTriggerCreator);

Example: content-based rotation with compression to SFTP file

This example can be found in the self-contained example documentation section.

ZIP Archive

Writing ZIP Archives

The ArchiveArchive contains flow for compressing multiple files into one ZIP file.

Result of flow can be send to sink even before whole ZIP file is created, so size of resulting ZIP archive is not limited to memory size.

This example usage shows compressing files from disk.

Scala
sourceval fileStream1: Source[ByteString,  Any] = ...
val fileStream2: Source[ByteString,  Any] = ...

val filesStream = Source(
  List(
    (ArchiveMetadata("akka_full_color.svg"), fileStream1),
    (ArchiveMetadata("akka_icon_reverse.svg"), fileStream2)))

val result = filesStream
  .via(Archive.zip())
  .runWith(FileIO.toPath(Paths.get("result.zip")))
Java
sourceSource<ByteString, NotUsed> source1 = ...
Source<ByteString, NotUsed> source2 = ...

Pair<ArchiveMetadata, Source<ByteString, NotUsed>> pair1 =
    Pair.create(ArchiveMetadata.create("akka_full_color.svg"), source1);
Pair<ArchiveMetadata, Source<ByteString, NotUsed>> pair2 =
    Pair.create(ArchiveMetadata.create("akka_icon_reverse.svg"), source2);

Source<Pair<ArchiveMetadata, Source<ByteString, NotUsed>>, NotUsed> source =
    Source.from(Arrays.asList(pair1, pair2));

Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("logo.zip"));
CompletionStage<IOResult> ioResult = source.via(Archive.zip()).runWith(fileSink, system);

Reading ZIP archives

Archive.zipReader()Archive.zipReader() reads a file in ZIP format, and emitting the metadata entry and a Source for every file in the stream. It is not needed to emit every file, also multiple files can be emitted in parallel. (Every sub-source will seek into the archive.)

The example below reads the incoming file, and unzip all to the local file system.

Scala
sourceval zipFile = // ???
val target: Path = // ???
Archive
  .zipReader(zipFile)
  .mapAsyncUnordered(4) {
    case (metadata, source) =>
      val targetFile = target.resolve(metadata.name)
      targetFile.toFile.getParentFile.mkdirs() // missing error handler
      source.runWith(FileIO.toPath(targetFile))
  }
Java
sourceArchive.zipReader(Paths.get("logo.zip").toFile())
    .mapAsync(
        4,
        pair -> {
          ZipArchiveMetadata metadata = pair.first();
          Path targetFile = target.resolve(metadata.getName());
          targetFile.toFile().getParentFile().mkdirs(); // missing error handler
          Source<ByteString, NotUsed> fSource = pair.second();
          // create the target directory
          return fSource
              .runWith(FileIO.toPath(targetFile), system)
              .thenApply(io -> Done.done());
        });

TAR Archive

Writing TAR archives

Archive.tar()Archive.tar() creates a flow for packaging multiple files into one TAR archive.

Result of flow can be send to sink even before whole TAR file is created, so size of resulting TAR archive is not limited to memory size.

This example usage shows packaging directories and files from disk.

Scala
source  val fileStream1: Source[ByteString,  Any] = ...
  val fileStream2: Source[ByteString,  Any] = ...
  val fileSize1: Long = ...
  val fileSize2: Long = ...

val filesStream = Source(
  List(
    (TarArchiveMetadata.directory("subdir", lastModification), Source.empty),
    (TarArchiveMetadata("subdir", "akka_full_color.svg", fileSize1, lastModification), fileStream1),
    (TarArchiveMetadata("akka_icon_reverse.svg", fileSize2, lastModification), fileStream2)))

val result = filesStream
  .via(Archive.tar())
  .runWith(FileIO.toPath(Paths.get("result.tar")))
Java
sourceSource<ByteString, NotUsed> source1 = ...
Source<ByteString, NotUsed> source2 = ...
Long size1 = ...
Long size2 = ...

Pair<TarArchiveMetadata, Source<ByteString, NotUsed>> dir =
    Pair.create(TarArchiveMetadata.directory("subdir", lastModification), Source.empty());

Pair<TarArchiveMetadata, Source<ByteString, NotUsed>> pair1 =
    Pair.create(
        TarArchiveMetadata.create("subdir", "akka_full_color.svg", size1, lastModification),
        source1);
Pair<TarArchiveMetadata, Source<ByteString, NotUsed>> pair2 =
    Pair.create(
        TarArchiveMetadata.create("akka_icon_reverse.svg", size2, lastModification), source2);

Source<Pair<TarArchiveMetadata, Source<ByteString, NotUsed>>, NotUsed> source =
    Source.from(Arrays.asList(dir, pair1, pair2));

Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("logo.tar"));
CompletionStage<IOResult> ioResult = source.via(Archive.tar()).runWith(fileSink, system);

To produce a gzipped TAR file see the following example.

Scala
sourceval resultGz = filesStream
  .via(Archive.tar().via(org.apache.pekko.stream.scaladsl.Compression.gzip))
  .runWith(FileIO.toPath(Paths.get("result.tar.gz")))
Java
sourceSink<ByteString, CompletionStage<IOResult>> fileSinkGz =
    FileIO.toPath(Paths.get("logo.tar.gz"));
CompletionStage<IOResult> ioResultGz =
    source
        .via(Archive.tar().via(org.apache.pekko.stream.javadsl.Compression.gzip()))
        .runWith(fileSinkGz, system);

Reading TAR archives

Archive.tarReader()Archive.tarReader() reads a stream of ByteStrings as TAR format emitting the metadata entry and a Source for every file in the stream. It is essential to request all the emitted source’s data, otherwise the stream will not reach the next file entry.

The example below reads the incoming stream, creates directories and stores all files in the local file system.

Scala
sourceval bytesSource: Source[ByteString, NotUsed] = // ???
val tar =
  bytesSource
    .via(Archive.tarReader())
    .mapAsync(1) {
      case (metadata, source) =>
        val targetFile = target.resolve(metadata.filePath)
        if (metadata.isDirectory) {
          Source
            .single(targetFile)
            .via(Directory.mkdirs())
            .runWith(Sink.ignore)
        } else {
          // create the target directory
          Source
            .single(targetFile.getParent)
            .via(Directory.mkdirs())
            .runWith(Sink.ignore)
            .map { _ =>
              // stream the file contents to a local file
              source.runWith(FileIO.toPath(targetFile))
            }
        }
    }
    .runWith(Sink.ignore)
Java
sourceSource<ByteString, NotUsed> bytesSource = // ???
CompletionStage<Done> tar =
    bytesSource
        .via(Archive.tarReader())
        .mapAsync(
            1,
            pair -> {
              TarArchiveMetadata metadata = pair.first();
              Path targetFile = target.resolve(metadata.filePath());
              if (metadata.isDirectory()) {
                return Source.single(targetFile)
                    .via(Directory.mkdirs())
                    .runWith(Sink.ignore(), system);
              } else {
                Source<ByteString, NotUsed> source = pair.second();
                // create the target directory
                return Source.single(targetFile.getParent())
                    .via(Directory.mkdirs())
                    .runWith(Sink.ignore(), system)
                    .thenCompose(
                        done ->
                            // stream the file contents to a local file
                            source
                                .runWith(FileIO.toPath(targetFile), system)
                                .thenApply(io -> Done.done()));
              }
            })
        .runWith(Sink.ignore(), system);

The test in NestedTarRaderTest illustrates how the tar reader may be used to extract tar archives from within a tar archive.