Hadoop Distributed File System - HDFS

The connector offers Flows and Sources that interact with HDFS file systems.

For more information about Hadoop, please visit the Hadoop documentation.

Project Info: Apache Pekko Connectors HDFS
Artifact
org.apache.pekko
pekko-connectors-hdfs
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.hdfs
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-hdfs" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-hdfs_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-hdfs_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Specifying a Hadoop Version

By default, HDFS connector uses Hadoop 3.2.1. If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the connector dependency and add the dependency for your preferred version.

Set up client

Flows provided by this connector need a prepared org.apache.hadoop.fs.FileSystem to interact with HDFS.

Scala
sourceimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

val conf = new Configuration()
conf.set("fs.default.name", "hdfs://localhost:54310")

val fs: FileSystem = FileSystem.get(conf)
Java
sourceConfiguration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:54310");

fs = FileSystem.get(conf);

Writing

The connector provides three Flows. Each flow requires RotationStrategy and SyncStrategy to run. HdfsFlow. HdfsFlow.

The flows push OutgoingMessage to a downstream.

Data Writer

Use HdfsFlow.data to stream with FSDataOutputStream without any compression.

Scala
sourceval flow = HdfsFlow.data(
  fs,
  SyncStrategy.count(500),
  RotationStrategy.size(1, FileUnit.GB),
  HdfsWritingSettings())
Java
sourceFlow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.data(
        fs, SyncStrategy.count(500), RotationStrategy.size(1, FileUnit.GB()), settings);

Compressed Data Writer

First, create CompressionCodec.

Scala
sourceval codec = new DefaultCodec()
codec.setConf(fs.getConf)
Java
sourceDefaultCodec codec = new DefaultCodec();
codec.setConf(fs.getConf());

Then, use HdfsFlow.compress to stream with CompressionOutputStream and CompressionCodec.

Scala
sourceval flow = HdfsFlow.compressed(
  fs,
  SyncStrategy.count(1),
  RotationStrategy.size(0.1, FileUnit.MB),
  codec,
  settings)
Java
sourceFlow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.compressed(
        fs, SyncStrategy.count(50), RotationStrategy.size(0.1, FileUnit.MB()), codec, settings);

Sequence Writer

Use HdfsFlow.sequence to stream a flat file consisting of binary key/value pairs.

Without Compression

Scala
sourceval flow = HdfsFlow.sequence(
  fs,
  SyncStrategy.none,
  RotationStrategy.size(1, FileUnit.MB),
  settings,
  classOf[Text],
  classOf[Text])
Java
sourceFlow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.sequence(
        fs,
        SyncStrategy.none(),
        RotationStrategy.size(1, FileUnit.MB()),
        SequenceFile.CompressionType.BLOCK,
        codec,
        settings,
        Text.class,
        Text.class);

With Compression

First, define a codec.

Scala
sourceval codec = new DefaultCodec()
codec.setConf(fs.getConf)
Java
sourceDefaultCodec codec = new DefaultCodec();
codec.setConf(fs.getConf());

Then, create a flow.

Scala
sourceval flow = HdfsFlow.sequence(
  fs,
  SyncStrategy.none,
  RotationStrategy.size(1, FileUnit.MB),
  CompressionType.BLOCK,
  codec,
  settings,
  classOf[Text],
  classOf[Text])
Java
sourceFlow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
    HdfsFlow.sequence(
        fs,
        SyncStrategy.none(),
        RotationStrategy.size(1, FileUnit.MB()),
        settings,
        Text.class,
        Text.class);

Passing data through HdfsFlow

Use HdfsFlow.dataWithPassThrough, HdfsFlow.compressedWithPassThrough or HdfsFlow.sequenceWithPassThrough.

When streaming documents from Kafka, you might want to commit to Kafka. The flow will emit two messages. For every input, it will produce WrittenMessage and when it rotates, RotationMessage.

Let’s say that we have these classes.

Scala
sourcecase class Book(title: String)
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
Java
sourcepublic static class Book {
  final String title;

  Book(String title) {
    this.title = title;
  }
}

static class KafkaCommitter {
  List<Integer> committedOffsets = new ArrayList<>();

  void commit(KafkaOffset offset) {
    committedOffsets.add(offset.offset);
  }
}

static class KafkaOffset {
  final int offset;

  KafkaOffset(int offset) {
    this.offset = offset;
  }
}

static class KafkaMessage {
  final Book book;
  final KafkaOffset offset;

  KafkaMessage(Book book, KafkaOffset offset) {
    this.book = book;
    this.offset = offset;
  }
}

Then, we can stream with passThrough.

Scala
source// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
val messagesFromKafka = List(
  KafkaMessage(Book("Pekko Concurrency"), KafkaOffset(0)),
  KafkaMessage(Book("Pekko in Action"), KafkaOffset(1)),
  KafkaMessage(Book("Effective Pekko"), KafkaOffset(2)),
  KafkaMessage(Book("Learning Scala"), KafkaOffset(3)),
  KafkaMessage(Book("Scala Puzzlers"), KafkaOffset(4)),
  KafkaMessage(Book("Scala for Spark in Production"), KafkaOffset(5)))

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val resF = Source(messagesFromKafka)
  .map { (kafkaMessage: KafkaMessage) =>
    val book = kafkaMessage.book
    // Transform message so that we can write to hdfs
    HdfsWriteMessage(ByteString(book.title), kafkaMessage.offset)
  }
  .via(
    HdfsFlow.dataWithPassThrough[KafkaOffset](
      fs,
      SyncStrategy.count(50),
      RotationStrategy.count(4),
      HdfsWritingSettings().withNewLine(true)))
  .map { message =>
    message match {
      case WrittenMessage(passThrough, _) =>
        commitToKafka(passThrough)
      case _ => ()
    }
    message
  }
  .collect {
    case rm: RotationMessage => rm
  }
  .runWith(Sink.seq)
Java
source// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
List<KafkaMessage> messagesFromKafka =
    Arrays.asList(
        new KafkaMessage(new Book("Pekko Concurrency"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Pekko in Action"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Effective Pekko"), new KafkaOffset(2)),
        new KafkaMessage(new Book("Learning Scala"), new KafkaOffset(3)),
        new KafkaMessage(new Book("Scala Puzzlers"), new KafkaOffset(4)),
        new KafkaMessage(new Book("Scala for Spark in Production"), new KafkaOffset(5)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

Flow<HdfsWriteMessage<ByteString, KafkaOffset>, OutgoingMessage<KafkaOffset>, NotUsed> flow =
    HdfsFlow.dataWithPassThrough(
        fs,
        SyncStrategy.count(50),
        RotationStrategy.count(4),
        HdfsWritingSettings.create().withNewLine(true));

CompletionStage<List<RotationMessage>> resF =
    Source.from(messagesFromKafka)
        .map(
            kafkaMessage -> {
              Book book = kafkaMessage.book;
              // Transform message so that we can write to hdfs\
              return HdfsWriteMessage.create(
                  ByteString.fromString(book.title), kafkaMessage.offset);
            })
        .via(flow)
        .map(
            message -> {
              if (message instanceof WrittenMessage) {
                kafkaCommitter.commit(((WrittenMessage<KafkaOffset>) message).passThrough());
                return message;
              } else {
                return message;
              }
            })
        .collectType(RotationMessage.class) // Collect only rotation messages
        .runWith(Sink.seq(), system);

Configuration

We can configure the sink by HdfsWritingSettings.

Scala
sourceval settings =
  HdfsWritingSettings()
    .withOverwrite(true)
    .withNewLine(false)
    .withLineSeparator(System.getProperty("line.separator"))
    .withPathGenerator(pathGenerator)
Java
sourceHdfsWritingSettings.create()
    .withOverwrite(true)
    .withNewLine(false)
    .withLineSeparator(System.getProperty("line.separator"))
    .withPathGenerator(pathGenerator);

File path generator

FilePathGenerator provides a functionality to generate rotation path in HDFS.

Scala
sourceval pathGenerator =
  FilePathGenerator((rotationCount: Long, timestamp: Long) => s"$destination$rotationCount-$timestamp")
Java
sourceBiFunction<Long, Long, String> func =
    (rotationCount, timestamp) -> "/tmp/pekko-connectors/" + rotationCount + "-" + timestamp;
FilePathGenerator pathGenerator = FilePathGenerator.create(func);

Rotation Strategy

RotationStrategy provides a functionality to decide when to rotate files.

Sync Strategy

SyncStrategy provides a functionality to decide when to synchronize the output.

Reading

Use HdfsSource to read from HDFS. HdfsSource. HdfsSource.

Data Reader

Scala
sourceval source = HdfsSource.data(fs, path)
Java
sourceSource<ByteString, CompletionStage<IOResult>> source = HdfsSource.data(fs, path);

Compressed Data Reader

Scala
sourceval source = HdfsSource.compressed(fs, path, codec)
Java
sourceSource<ByteString, CompletionStage<IOResult>> source = HdfsSource.compressed(fs, path, codec);

Sequence Reader

Scala
sourceval source = HdfsSource.sequence(fs, path, classOf[Text], classOf[Text])
Java
sourceSource<Pair<Text, Text>, NotUsed> source =
    HdfsSource.sequence(fs, path, Text.class, Text.class);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> hdfs/testOnly *.HdfsWriterSpec
> hdfs/testOnly *.HdfsReaderSpec
Java
sbt
> hdfs/testOnly *.HdfsWriterTest
> hdfs/testOnly *.HdfsReaderTest