Hadoop Distributed File System - HDFS
The connector offers Flows and Sources that interact with HDFS file systems.
For more information about Hadoop, please visit the Hadoop documentation.
Project Info: Apache Pekko Connectors HDFS | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-hdfs
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.hdfs |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-hdfs" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-hdfs_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-hdfs_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Specifying a Hadoop Version¶
By default, HDFS connector uses Hadoop 3.3.6. If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the connector dependency and add the dependency for your preferred version.
Set up client¶
Flows provided by this connector need a prepared org.apache.hadoop.fs.FileSystem
to interact with HDFS.
sourceimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
val conf = new Configuration()
conf.set("fs.default.name", "hdfs://localhost:54310")
val fs: FileSystem = FileSystem.get(conf)
sourceConfiguration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:54310");
fs = FileSystem.get(conf);
Writing¶
The connector provides three Flows. Each flow requires RotationStrategy
and SyncStrategy
to run. HdfsFlow
.
The flows push OutgoingMessage
to a downstream.
Data Writer¶
Use HdfsFlow.data
to stream with FSDataOutputStream
without any compression.
sourceval flow = HdfsFlow.data(
fs,
SyncStrategy.count(500),
RotationStrategy.size(1, FileUnit.GB),
HdfsWritingSettings())
sourceFlow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
HdfsFlow.data(
fs, SyncStrategy.count(500), RotationStrategy.size(1, FileUnit.GB()), settings);
Compressed Data Writer¶
First, create CompressionCodec
.
sourceval codec = new DefaultCodec()
codec.setConf(fs.getConf)
sourceDefaultCodec codec = new DefaultCodec();
codec.setConf(fs.getConf());
Then, use HdfsFlow.compress
to stream with CompressionOutputStream
and CompressionCodec
.
sourceval flow = HdfsFlow.compressed(
fs,
SyncStrategy.count(1),
RotationStrategy.size(0.1, FileUnit.MB),
codec,
settings)
sourceFlow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
HdfsFlow.compressed(
fs, SyncStrategy.count(50), RotationStrategy.size(0.1, FileUnit.MB()), codec, settings);
Sequence Writer¶
Use HdfsFlow.sequence
to stream a flat file consisting of binary key/value pairs.
Without Compression¶
sourceval flow = HdfsFlow.sequence(
fs,
SyncStrategy.none,
RotationStrategy.size(1, FileUnit.MB),
settings,
classOf[Text],
classOf[Text])
sourceFlow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
HdfsFlow.sequence(
fs,
SyncStrategy.none(),
RotationStrategy.size(1, FileUnit.MB()),
SequenceFile.CompressionType.BLOCK,
codec,
settings,
Text.class,
Text.class);
With Compression¶
First, define a codec.
sourceval codec = new DefaultCodec()
codec.setConf(fs.getConf)
sourceDefaultCodec codec = new DefaultCodec();
codec.setConf(fs.getConf());
Then, create a flow.
sourceval flow = HdfsFlow.sequence(
fs,
SyncStrategy.none,
RotationStrategy.size(1, FileUnit.MB),
CompressionType.BLOCK,
codec,
settings,
classOf[Text],
classOf[Text])
sourceFlow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
HdfsFlow.sequence(
fs,
SyncStrategy.none(),
RotationStrategy.size(1, FileUnit.MB()),
settings,
Text.class,
Text.class);
Passing data through HdfsFlow¶
Use HdfsFlow.dataWithPassThrough
, HdfsFlow.compressedWithPassThrough
or HdfsFlow.sequenceWithPassThrough
.
When streaming documents from Kafka, you might want to commit to Kafka. The flow will emit two messages. For every input, it will produce WrittenMessage
and when it rotates, RotationMessage
.
Let’s say that we have these classes.
sourcecase class Book(title: String)
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
sourcepublic static class Book {
final String title;
Book(String title) {
this.title = title;
}
}
static class KafkaCommitter {
List<Integer> committedOffsets = new ArrayList<>();
void commit(KafkaOffset offset) {
committedOffsets.add(offset.offset);
}
}
static class KafkaOffset {
final int offset;
KafkaOffset(int offset) {
this.offset = offset;
}
}
static class KafkaMessage {
final Book book;
final KafkaOffset offset;
KafkaMessage(Book book, KafkaOffset offset) {
this.book = book;
this.offset = offset;
}
}
Then, we can stream with passThrough
.
source// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
val messagesFromKafka = List(
KafkaMessage(Book("Pekko Concurrency"), KafkaOffset(0)),
KafkaMessage(Book("Pekko in Action"), KafkaOffset(1)),
KafkaMessage(Book("Effective Pekko"), KafkaOffset(2)),
KafkaMessage(Book("Learning Scala"), KafkaOffset(3)),
KafkaMessage(Book("Scala Puzzlers"), KafkaOffset(4)),
KafkaMessage(Book("Scala for Spark in Production"), KafkaOffset(5)))
var committedOffsets = List[KafkaOffset]()
def commitToKafka(offset: KafkaOffset): Unit =
committedOffsets = committedOffsets :+ offset
val resF = Source(messagesFromKafka)
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
// Transform message so that we can write to hdfs
HdfsWriteMessage(ByteString(book.title), kafkaMessage.offset)
}
.via(
HdfsFlow.dataWithPassThrough[KafkaOffset](
fs,
SyncStrategy.count(50),
RotationStrategy.count(4),
HdfsWritingSettings().withNewLine(true)))
.map { message =>
message match {
case WrittenMessage(passThrough, _) =>
commitToKafka(passThrough)
case _ => ()
}
message
}
.collect {
case rm: RotationMessage => rm
}
.runWith(Sink.seq)
source// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
List<KafkaMessage> messagesFromKafka =
Arrays.asList(
new KafkaMessage(new Book("Pekko Concurrency"), new KafkaOffset(0)),
new KafkaMessage(new Book("Pekko in Action"), new KafkaOffset(1)),
new KafkaMessage(new Book("Effective Pekko"), new KafkaOffset(2)),
new KafkaMessage(new Book("Learning Scala"), new KafkaOffset(3)),
new KafkaMessage(new Book("Scala Puzzlers"), new KafkaOffset(4)),
new KafkaMessage(new Book("Scala for Spark in Production"), new KafkaOffset(5)));
final KafkaCommitter kafkaCommitter = new KafkaCommitter();
Flow<HdfsWriteMessage<ByteString, KafkaOffset>, OutgoingMessage<KafkaOffset>, NotUsed> flow =
HdfsFlow.dataWithPassThrough(
fs,
SyncStrategy.count(50),
RotationStrategy.count(4),
HdfsWritingSettings.create().withNewLine(true));
CompletionStage<List<RotationMessage>> resF =
Source.from(messagesFromKafka)
.map(
kafkaMessage -> {
Book book = kafkaMessage.book;
// Transform message so that we can write to hdfs\
return HdfsWriteMessage.create(
ByteString.fromString(book.title), kafkaMessage.offset);
})
.via(flow)
.map(
message -> {
if (message instanceof WrittenMessage) {
kafkaCommitter.commit(((WrittenMessage<KafkaOffset>) message).passThrough());
return message;
} else {
return message;
}
})
.collectType(RotationMessage.class) // Collect only rotation messages
.runWith(Sink.seq(), system);
Configuration¶
We can configure the sink by HdfsWritingSettings
.
sourceval settings =
HdfsWritingSettings()
.withOverwrite(true)
.withNewLine(false)
.withLineSeparator(System.getProperty("line.separator"))
.withPathGenerator(pathGenerator)
sourceHdfsWritingSettings.create()
.withOverwrite(true)
.withNewLine(false)
.withLineSeparator(System.getProperty("line.separator"))
.withPathGenerator(pathGenerator);
File path generator¶
FilePathGenerator
provides a functionality to generate rotation path in HDFS.
sourceval pathGenerator =
FilePathGenerator((rotationCount: Long, timestamp: Long) => s"$destination$rotationCount-$timestamp")
sourceBiFunction<Long, Long, String> func =
(rotationCount, timestamp) -> "/tmp/pekko-connectors/" + rotationCount + "-" + timestamp;
FilePathGenerator pathGenerator = FilePathGenerator.create(func);
Rotation Strategy¶
RotationStrategy
provides a functionality to decide when to rotate files.
Sync Strategy¶
SyncStrategy
provides a functionality to decide when to synchronize the output.
Reading¶
Use HdfsSource
to read from HDFS. HdfsSource
.
Data Reader¶
sourceval source = HdfsSource.data(fs, path)
sourceSource<ByteString, CompletionStage<IOResult>> source = HdfsSource.data(fs, path);
Compressed Data Reader¶
sourceval source = HdfsSource.compressed(fs, path, codec)
sourceSource<ByteString, CompletionStage<IOResult>> source = HdfsSource.compressed(fs, path, codec);
Sequence Reader¶
sourceval source = HdfsSource.sequence(fs, path, classOf[Text], classOf[Text])
sourceSource<Pair<Text, Text>, NotUsed> source =
HdfsSource.sequence(fs, path, Text.class, Text.class);
Running the example code¶
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
sbt
> hdfs/testOnly *.HdfsWriterSpec
> hdfs/testOnly *.HdfsReaderSpec
sbt
> hdfs/testOnly *.HdfsWriterTest
> hdfs/testOnly *.HdfsReaderTest