Apache Kudu
The Apache Pekko Connectors Kudu connector supports writing to Apache Kudu tables.
Apache Kudu is a free and open source column-oriented data store in the Apache Hadoop ecosystem.
Project Info: Apache Pekko Connectors Kudu | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-kudu
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.kudu |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-kudu" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-kudu_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-kudu_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Configuration¶
To connect to Kudu you need:
- Describe the Kudu
Schema
- Define a converter function to map your data type to a
PartialRow
- Specify Kudu
CreateTableOptions
- Set up Apache Pekko Connectors’
KuduTableSettings
source// Kudu Schema
val cols = List(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build,
new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build)
val schema = new Schema(cols.asJava)
// Converter function
case class Person(id: Int, name: String)
val kuduConverter: Person => PartialRow = { person =>
val partialRow = schema.newPartialRow()
partialRow.addInt(0, person.id)
partialRow.addString(1, person.name)
partialRow
}
// Kudu table options
val rangeKeys = List("key")
val createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys.asJava)
// Pekko Connectors Settings
val kuduTableSettings = KuduTableSettings("test", schema, createTableOptions, kuduConverter)
source// Kudu Schema
List<ColumnSchema> columns = new ArrayList<>(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
schema = new Schema(columns);
// Converter function
Function<Person, PartialRow> kuduConverter =
person -> {
PartialRow partialRow = schema.newPartialRow();
partialRow.addInt(0, person.id);
partialRow.addString(1, person.name);
return partialRow;
};
// Kudu table options
List<String> rangeKeys = Collections.singletonList("key");
CreateTableOptions createTableOptions =
new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys);
// Pekko Connectors Settings
KuduTableSettings<Person> tableSettings =
KuduTableSettings.create("tablenameSink", schema, createTableOptions, kuduConverter);
The KuduClient
by default is automatically managed by the connector. Settings for the client are read from the reference.conf file. A manually initialized client can be injected to the stream using KuduAttributes
sourceval masterAddress = "localhost:7051"
val client = new KuduClient.KuduClientBuilder(masterAddress).build
system.registerOnTermination(client.shutdown())
val flow: Flow[Person, Person, NotUsed] =
KuduTable
.flow(kuduTableSettings.withTableName("Flow"))
.withAttributes(KuduAttributes.client(client))
sourcefinal String masterAddress = "localhost:7051";
final KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
system.registerOnTermination(
() -> {
try {
client.shutdown();
} catch (KuduException e) {
e.printStackTrace();
}
});
final Flow<Person, Person, NotUsed> flow =
KuduTable.flow(tableSettings.withTableName("Flow"))
.withAttributes(KuduAttributes.client(client));
Writing to Kudu in a Flow¶
sourceval flow: Flow[Person, Person, NotUsed] =
KuduTable.flow(kuduTableSettings.withTableName("Flow"))
val f = Source(11 to 20)
.map(i => Person(i, s"zozo_$i"))
.via(flow)
.runWith(Sink.fold(0)((a, d) => a + d.id))
sourceFlow<Person, Person, NotUsed> flow = KuduTable.flow(tableSettings.withTableName("Flow"));
CompletionStage<List<Person>> run =
Source.from(Arrays.asList(200, 201, 202, 203, 204))
.map((i) -> new Person(i, String.format("name_%d", i)))
.via(flow)
.toMat(Sink.seq(), Keep.right())
.run(system);
Writing to Kudu with a Sink¶
sourceval sink: Sink[Person, Future[Done]] =
KuduTable.sink(kuduTableSettings.withTableName("Sink"))
val f = Source(1 to 10)
.map(i => Person(i, s"zozo_$i"))
.runWith(sink)
sourcefinal Sink<Person, CompletionStage<Done>> sink =
KuduTable.sink(tableSettings.withTableName("Sink"));
CompletionStage<Done> o =
Source.from(Arrays.asList(100, 101, 102, 103, 104))
.map((i) -> new Person(i, String.format("name %d", i)))
.runWith(sink, system);
1.1.0