Hadoop Distributed File System - HDFS
The connector offers Flows and Sources that interact with HDFS file systems.
For more information about Hadoop, please visit the Hadoop documentation.
Project Info: Apache Pekko Connectors HDFS | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-hdfs
1.0.2
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.14, 2.12.20, 3.3.3 |
JPMS module name | pekko.stream.connectors.hdfs |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.0.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-hdfs" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.0.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-hdfs_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.0.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-hdfs_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Specifying a Hadoop Version
By default, HDFS connector uses Hadoop 3.2.1. If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the connector dependency and add the dependency for your preferred version.
Set up client
Flows provided by this connector need a prepared org.apache.hadoop.fs.FileSystem
to interact with HDFS.
- Scala
-
source
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem val conf = new Configuration() conf.set("fs.default.name", "hdfs://localhost:54310") val fs: FileSystem = FileSystem.get(conf)
- Java
-
source
Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://localhost:54310"); fs = FileSystem.get(conf);
Writing
The connector provides three Flows. Each flow requires RotationStrategy
and SyncStrategy
to run. HdfsFlow
. HdfsFlow
.
The flows push OutgoingMessage
to a downstream.
Data Writer
Use HdfsFlow.data
to stream with FSDataOutputStream
without any compression.
- Scala
-
source
val flow = HdfsFlow.data( fs, SyncStrategy.count(500), RotationStrategy.size(1, FileUnit.GB), HdfsWritingSettings())
- Java
-
source
Flow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow = HdfsFlow.data( fs, SyncStrategy.count(500), RotationStrategy.size(1, FileUnit.GB()), settings);
Compressed Data Writer
First, create CompressionCodec
.
- Scala
-
source
val codec = new DefaultCodec() codec.setConf(fs.getConf)
- Java
-
source
DefaultCodec codec = new DefaultCodec(); codec.setConf(fs.getConf());
Then, use HdfsFlow.compress
to stream with CompressionOutputStream
and CompressionCodec
.
- Scala
-
source
val flow = HdfsFlow.compressed( fs, SyncStrategy.count(1), RotationStrategy.size(0.1, FileUnit.MB), codec, settings)
- Java
-
source
Flow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow = HdfsFlow.compressed( fs, SyncStrategy.count(50), RotationStrategy.size(0.1, FileUnit.MB()), codec, settings);
Sequence Writer
Use HdfsFlow.sequence
to stream a flat file consisting of binary key/value pairs.
Without Compression
- Scala
-
source
val flow = HdfsFlow.sequence( fs, SyncStrategy.none, RotationStrategy.size(1, FileUnit.MB), settings, classOf[Text], classOf[Text])
- Java
-
source
Flow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow = HdfsFlow.sequence( fs, SyncStrategy.none(), RotationStrategy.size(1, FileUnit.MB()), SequenceFile.CompressionType.BLOCK, codec, settings, Text.class, Text.class);
With Compression
First, define a codec.
- Scala
-
source
val codec = new DefaultCodec() codec.setConf(fs.getConf)
- Java
-
source
DefaultCodec codec = new DefaultCodec(); codec.setConf(fs.getConf());
Then, create a flow.
- Scala
-
source
val flow = HdfsFlow.sequence( fs, SyncStrategy.none, RotationStrategy.size(1, FileUnit.MB), CompressionType.BLOCK, codec, settings, classOf[Text], classOf[Text])
- Java
-
source
Flow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow = HdfsFlow.sequence( fs, SyncStrategy.none(), RotationStrategy.size(1, FileUnit.MB()), settings, Text.class, Text.class);
Passing data through HdfsFlow
Use HdfsFlow.dataWithPassThrough
, HdfsFlow.compressedWithPassThrough
or HdfsFlow.sequenceWithPassThrough
.
When streaming documents from Kafka, you might want to commit to Kafka. The flow will emit two messages. For every input, it will produce WrittenMessage
and when it rotates, RotationMessage
.
Let’s say that we have these classes.
- Scala
-
source
case class Book(title: String) case class KafkaOffset(offset: Int) case class KafkaMessage(book: Book, offset: KafkaOffset)
- Java
-
source
public static class Book { final String title; Book(String title) { this.title = title; } } static class KafkaCommitter { List<Integer> committedOffsets = new ArrayList<>(); void commit(KafkaOffset offset) { committedOffsets.add(offset.offset); } } static class KafkaOffset { final int offset; KafkaOffset(int offset) { this.offset = offset; } } static class KafkaMessage { final Book book; final KafkaOffset offset; KafkaMessage(Book book, KafkaOffset offset) { this.book = book; this.offset = offset; } }
Then, we can stream with passThrough
.
- Scala
-
source
// We're going to pretend we got messages from kafka. // After we've written them to HDFS, we want // to commit the offset to Kafka val messagesFromKafka = List( KafkaMessage(Book("Pekko Concurrency"), KafkaOffset(0)), KafkaMessage(Book("Pekko in Action"), KafkaOffset(1)), KafkaMessage(Book("Effective Pekko"), KafkaOffset(2)), KafkaMessage(Book("Learning Scala"), KafkaOffset(3)), KafkaMessage(Book("Scala Puzzlers"), KafkaOffset(4)), KafkaMessage(Book("Scala for Spark in Production"), KafkaOffset(5))) var committedOffsets = List[KafkaOffset]() def commitToKafka(offset: KafkaOffset): Unit = committedOffsets = committedOffsets :+ offset val resF = Source(messagesFromKafka) .map { (kafkaMessage: KafkaMessage) => val book = kafkaMessage.book // Transform message so that we can write to hdfs HdfsWriteMessage(ByteString(book.title), kafkaMessage.offset) } .via( HdfsFlow.dataWithPassThrough[KafkaOffset]( fs, SyncStrategy.count(50), RotationStrategy.count(4), HdfsWritingSettings().withNewLine(true))) .map { message => message match { case WrittenMessage(passThrough, _) => commitToKafka(passThrough) case _ => () } message } .collect { case rm: RotationMessage => rm } .runWith(Sink.seq)
- Java
-
source
// We're going to pretend we got messages from kafka. // After we've written them to HDFS, we want // to commit the offset to Kafka List<KafkaMessage> messagesFromKafka = Arrays.asList( new KafkaMessage(new Book("Pekko Concurrency"), new KafkaOffset(0)), new KafkaMessage(new Book("Pekko in Action"), new KafkaOffset(1)), new KafkaMessage(new Book("Effective Pekko"), new KafkaOffset(2)), new KafkaMessage(new Book("Learning Scala"), new KafkaOffset(3)), new KafkaMessage(new Book("Scala Puzzlers"), new KafkaOffset(4)), new KafkaMessage(new Book("Scala for Spark in Production"), new KafkaOffset(5))); final KafkaCommitter kafkaCommitter = new KafkaCommitter(); Flow<HdfsWriteMessage<ByteString, KafkaOffset>, OutgoingMessage<KafkaOffset>, NotUsed> flow = HdfsFlow.dataWithPassThrough( fs, SyncStrategy.count(50), RotationStrategy.count(4), HdfsWritingSettings.create().withNewLine(true)); CompletionStage<List<RotationMessage>> resF = Source.from(messagesFromKafka) .map( kafkaMessage -> { Book book = kafkaMessage.book; // Transform message so that we can write to hdfs\ return HdfsWriteMessage.create( ByteString.fromString(book.title), kafkaMessage.offset); }) .via(flow) .map( message -> { if (message instanceof WrittenMessage) { kafkaCommitter.commit(((WrittenMessage<KafkaOffset>) message).passThrough()); return message; } else { return message; } }) .collectType(RotationMessage.class) // Collect only rotation messages .runWith(Sink.seq(), system);
Configuration
We can configure the sink by HdfsWritingSettings
.
- Scala
-
source
val settings = HdfsWritingSettings() .withOverwrite(true) .withNewLine(false) .withLineSeparator(System.getProperty("line.separator")) .withPathGenerator(pathGenerator)
- Java
-
source
HdfsWritingSettings.create() .withOverwrite(true) .withNewLine(false) .withLineSeparator(System.getProperty("line.separator")) .withPathGenerator(pathGenerator);
File path generator
FilePathGenerator
provides a functionality to generate rotation path in HDFS.
- Scala
-
source
val pathGenerator = FilePathGenerator((rotationCount: Long, timestamp: Long) => s"$destination$rotationCount-$timestamp")
- Java
-
source
BiFunction<Long, Long, String> func = (rotationCount, timestamp) -> "/tmp/pekko-connectors/" + rotationCount + "-" + timestamp; FilePathGenerator pathGenerator = FilePathGenerator.create(func);
Rotation Strategy
RotationStrategy
provides a functionality to decide when to rotate files.
Sync Strategy
SyncStrategy
provides a functionality to decide when to synchronize the output.
Reading
Use HdfsSource
to read from HDFS. HdfsSource
. HdfsSource
.
Data Reader
- Scala
-
source
val source = HdfsSource.data(fs, path)
- Java
-
source
Source<ByteString, CompletionStage<IOResult>> source = HdfsSource.data(fs, path);
Compressed Data Reader
- Scala
-
source
val source = HdfsSource.compressed(fs, path, codec)
- Java
-
source
Source<ByteString, CompletionStage<IOResult>> source = HdfsSource.compressed(fs, path, codec);
Sequence Reader
- Scala
-
source
val source = HdfsSource.sequence(fs, path, classOf[Text], classOf[Text])
- Java
-
source
Source<Pair<Text, Text>, NotUsed> source = HdfsSource.sequence(fs, path, Text.class, Text.class);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > hdfs/testOnly *.HdfsWriterSpec > hdfs/testOnly *.HdfsReaderSpec
- Java
-
sbt > hdfs/testOnly *.HdfsWriterTest > hdfs/testOnly *.HdfsReaderTest