HBase
The connector provides sources, flows and sinks to interact with HBase database.
HBase is a column family NoSQL Database backed by HDFS. For more information about HBase, please visit the HBase documentation.
Project Info: Apache Pekko Connectors HBase | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-hbase
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.hbase |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-hbase" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-hbase_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-hbase_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Converters¶
Converters map the domain object to a list of HBase mutations (Append
, Delete
, Increment
, Put
).
Put¶
sourceimplicit def toBytes(string: String): Array[Byte] = Bytes.toBytes(string)
case class Person(id: Int, name: String)
val hBaseConverter: Person => immutable.Seq[Mutation] = { person =>
val put = new Put(s"id_${person.id}")
put.addColumn("info", "name", person.name)
List(put)
}
sourceFunction<Person, List<Mutation>> hBaseConverter =
person -> {
try {
Put put = new Put(String.format("id_%d", person.id).getBytes("UTF-8"));
put.addColumn(
"info".getBytes("UTF-8"), "name".getBytes("UTF-8"), person.name.getBytes("UTF-8"));
return Collections.singletonList(put);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return Collections.emptyList();
}
};
Append¶
sourceval appendHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
// Append to a cell
val append = new Append(s"id_${person.id}")
append.add("info", "aliases", person.name)
List(append)
}
sourceFunction<Person, List<Mutation>> appendHBaseConverter =
person -> {
try {
Append append = new Append(String.format("id_%d", person.id).getBytes("UTF-8"));
append.add(
"info".getBytes("UTF-8"), "aliases".getBytes("UTF-8"), person.name.getBytes("UTF-8"));
return Collections.singletonList(append);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return Collections.emptyList();
}
};
Delete¶
sourceval deleteHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
// Delete the specified row
val delete = new Delete(s"id_${person.id}")
List(delete)
}
sourceFunction<Person, List<Mutation>> deleteHBaseConverter =
person -> {
try {
Delete delete = new Delete(String.format("id_%d", person.id).getBytes("UTF-8"));
return Collections.singletonList(delete);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return Collections.emptyList();
}
};
Increment¶
sourceval incrementHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
// Increment a cell value
val increment = new Increment(s"id_${person.id}")
increment.addColumn("info", "numberOfChanges", 1)
List(increment)
}
sourceFunction<Person, List<Mutation>> incrementHBaseConverter =
person -> {
try {
Increment increment = new Increment(String.format("id_%d", person.id).getBytes("UTF-8"));
increment.addColumn("info".getBytes("UTF-8"), "numberOfChanges".getBytes("UTF-8"), 1);
return Collections.singletonList(increment);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return Collections.emptyList();
}
};
Complex and noop mutations¶
To ignore an object return an empty List
- this will have no effect on HBase. You can also combine mutations to perform complex business logic:
sourceval mutationsHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
if (person.id != 0) {
if (person.name.isEmpty) {
// Delete the specified row
val delete = new Delete(s"id_${person.id}")
List(delete)
} else {
// Insert or update a row
val put = new Put(s"id_${person.id}")
put.addColumn("info", "name", person.name)
val increment = new Increment(s"id_${person.id}")
increment.addColumn("info", "numberOfChanges", 1)
List(put, increment)
}
} else {
List.empty
}
}
sourceFunction<Person, List<Mutation>> complexHBaseConverter =
person -> {
try {
byte[] id = String.format("id_%d", person.id).getBytes("UTF-8");
byte[] infoFamily = "info".getBytes("UTF-8");
if (person.id != 0 && person.name.isEmpty()) {
Delete delete = new Delete(id);
return Collections.singletonList(delete);
} else if (person.id != 0) {
Put put = new Put(id);
put.addColumn(infoFamily, "name".getBytes("UTF-8"), person.name.getBytes("UTF-8"));
Increment increment = new Increment(id);
increment.addColumn(infoFamily, "numberOfChanges".getBytes("UTF-8"), 1);
return Arrays.asList(put, increment);
} else {
return Collections.emptyList();
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return Collections.emptyList();
}
};
If you return a list of mutations they will be applied in the same order. The list of mutations are not applied in an transaction, each mutation is independent.
Settings¶
HBase combinators require HTableSettings
. If the table referenced in the settings does not exist, it will be created on demand.
sourceval tableSettings =
HTableSettings(HBaseConfiguration.create(), TableName.valueOf("person"), immutable.Seq("info"), hBaseConverter)
sourceHTableSettings<Person> tableSettings =
HTableSettings.create(
HBaseConfiguration.create(),
TableName.valueOf("person1"),
Collections.singletonList("info"),
hBaseConverter);
Source¶
sourceval scan = new Scan(new Get(Bytes.toBytes("id_100")))
val f = HTableStage
.source(scan, tableSettings)
.runWith(Sink.seq)
sourceScan scan = new Scan(new Get("id_300".getBytes("UTF-8")));
CompletionStage<List<Result>> f =
HTableStage.source(scan, tableSettings).runWith(Sink.seq(), system);
Flow¶
sourceval flow = HTableStage.flow[Person](tableSettings)
val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id))
sourceFlow<Person, Person, NotUsed> flow = HTableStage.flow(tableSettings);
Pair<NotUsed, CompletionStage<List<Person>>> run =
Source.from(Arrays.asList(200, 201, 202, 203, 204))
.map((i) -> new Person(i, String.format("name_%d", i)))
.via(flow)
.toMat(Sink.seq(), Keep.both())
.run(system);
Sink¶
sourceval sink = HTableStage.sink[Person](tableSettings)
val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink)
sourcefinal Sink<Person, CompletionStage<Done>> sink = HTableStage.sink(tableSettings);
CompletionStage<Done> o =
Source.from(Arrays.asList(100, 101, 102, 103, 104))
.map((i) -> new Person(i, String.format("name %d", i)))
.runWith(sink, system);
HBase administration commands¶
To manage HBase database, startup HBase shell ($HBASE_HOME/bin/shell
), and run following commands:
list // list tables
scan "person" // select * from person
disable "person" // Disable table "person", before drop
drop "person"