HBase
The connector provides sources, flows and sinks to interact with HBase database.
HBase is a column family NoSQL Database backed by HDFS. For more information about HBase, please visit the HBase documentation.
| Project Info: Apache Pekko Connectors HBase | |
|---|---|
| Artifact | org.apache.pekko
pekko-connectors-hbase
1.2.0+3-e195cec2-SNAPSHOT
|
| JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
| Scala versions | 2.13.17, 2.12.20, 3.3.6 |
| JPMS module name | pekko.stream.connectors.hbase |
| License | |
| API documentation | |
| Forums | |
| Release notes | GitHub releases |
| Issues | Github issues |
| Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.1.5" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-hbase" % "1.2.0+3-e195cec2-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )- Maven
<properties> <pekko.version>1.1.5</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-hbase_${scala.binary.version}</artifactId> <version>1.2.0+3-e195cec2-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>- Gradle
def versions = [ PekkoVersion: "1.1.5", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-hbase_${versions.ScalaBinary}:1.2.0+3-e195cec2-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Converters
Converters map the domain object to a list of HBase mutations (Append, Delete, Increment, Put).
Put
- Scala
-
source
implicit def toBytes(string: String): Array[Byte] = Bytes.toBytes(string) case class Person(id: Int, name: String) val hBaseConverter: Person => immutable.Seq[Mutation] = { person => val put = new Put(s"id_${person.id}") put.addColumn("info", "name", person.name) List(put) } - Java
-
source
Function<Person, List<Mutation>> hBaseConverter = person -> { try { Put put = new Put(String.format("id_%d", person.id).getBytes("UTF-8")); put.addColumn( "info".getBytes("UTF-8"), "name".getBytes("UTF-8"), person.name.getBytes("UTF-8")); return Collections.singletonList(put); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Append
- Scala
-
source
val appendHBaseConverter: Person => immutable.Seq[Mutation] = { person => // Append to a cell val append = new Append(s"id_${person.id}") append.add("info", "aliases", person.name) List(append) } - Java
-
source
Function<Person, List<Mutation>> appendHBaseConverter = person -> { try { Append append = new Append(String.format("id_%d", person.id).getBytes("UTF-8")); append.add( "info".getBytes("UTF-8"), "aliases".getBytes("UTF-8"), person.name.getBytes("UTF-8")); return Collections.singletonList(append); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Delete
- Scala
-
source
val deleteHBaseConverter: Person => immutable.Seq[Mutation] = { person => // Delete the specified row val delete = new Delete(s"id_${person.id}") List(delete) } - Java
-
source
Function<Person, List<Mutation>> deleteHBaseConverter = person -> { try { Delete delete = new Delete(String.format("id_%d", person.id).getBytes("UTF-8")); return Collections.singletonList(delete); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Increment
- Scala
-
source
val incrementHBaseConverter: Person => immutable.Seq[Mutation] = { person => // Increment a cell value val increment = new Increment(s"id_${person.id}") increment.addColumn("info", "numberOfChanges", 1) List(increment) } - Java
-
source
Function<Person, List<Mutation>> incrementHBaseConverter = person -> { try { Increment increment = new Increment(String.format("id_%d", person.id).getBytes("UTF-8")); increment.addColumn("info".getBytes("UTF-8"), "numberOfChanges".getBytes("UTF-8"), 1); return Collections.singletonList(increment); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Complex and noop mutations
To ignore an object return an empty List - this will have no effect on HBase. You can also combine mutations to perform complex business logic:
- Scala
-
source
val mutationsHBaseConverter: Person => immutable.Seq[Mutation] = { person => if (person.id != 0) { if (person.name.isEmpty) { // Delete the specified row val delete = new Delete(s"id_${person.id}") List(delete) } else { // Insert or update a row val put = new Put(s"id_${person.id}") put.addColumn("info", "name", person.name) val increment = new Increment(s"id_${person.id}") increment.addColumn("info", "numberOfChanges", 1) List(put, increment) } } else { List.empty } } - Java
-
source
Function<Person, List<Mutation>> complexHBaseConverter = person -> { try { byte[] id = String.format("id_%d", person.id).getBytes("UTF-8"); byte[] infoFamily = "info".getBytes("UTF-8"); if (person.id != 0 && person.name.isEmpty()) { Delete delete = new Delete(id); return Collections.singletonList(delete); } else if (person.id != 0) { Put put = new Put(id); put.addColumn(infoFamily, "name".getBytes("UTF-8"), person.name.getBytes("UTF-8")); Increment increment = new Increment(id); increment.addColumn(infoFamily, "numberOfChanges".getBytes("UTF-8"), 1); return Arrays.asList(put, increment); } else { return Collections.emptyList(); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
If you return a list of mutations they will be applied in the same order. The list of mutations are not applied in an transaction, each mutation is independent.
Settings
HBase combinators require HTableSettings. If the table referenced in the settings does not exist, it will be created on demand.
- Scala
-
source
val tableSettings = HTableSettings(HBaseConfiguration.create(), TableName.valueOf("person"), immutable.Seq("info"), hBaseConverter) - Java
-
source
HTableSettings<Person> tableSettings = HTableSettings.create( HBaseConfiguration.create(), TableName.valueOf("person1"), Collections.singletonList("info"), hBaseConverter);
Source
- Scala
-
source
val scan = new Scan(new Get(Bytes.toBytes("id_100"))) val f = HTableStage .source(scan, tableSettings) .runWith(Sink.seq) - Java
-
source
Scan scan = new Scan(new Get("id_300".getBytes("UTF-8"))); CompletionStage<List<Result>> f = HTableStage.source(scan, tableSettings).runWith(Sink.seq(), system);
Flow
- Scala
-
source
val flow = HTableStage.flow[Person](tableSettings) val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id)) - Java
-
source
Flow<Person, Person, NotUsed> flow = HTableStage.flow(tableSettings); Pair<NotUsed, CompletionStage<List<Person>>> run = Source.from(Arrays.asList(200, 201, 202, 203, 204)) .map((i) -> new Person(i, String.format("name_%d", i))) .via(flow) .toMat(Sink.seq(), Keep.both()) .run(system);
Sink
- Scala
-
source
val sink = HTableStage.sink[Person](tableSettings) val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink) - Java
-
source
final Sink<Person, CompletionStage<Done>> sink = HTableStage.sink(tableSettings); CompletionStage<Done> o = Source.from(Arrays.asList(100, 101, 102, 103, 104)) .map((i) -> new Person(i, String.format("name %d", i))) .runWith(sink, system);
HBase administration commands
To manage HBase database, startup HBase shell ($HBASE_HOME/bin/shell), and run following commands:
list // list tables
scan "person" // select * from person
disable "person" // Disable table "person", before drop
drop "person"