HBase
The connector provides sources, flows and sinks to interact with HBase database.
HBase is a column family NoSQL Database backed by HDFS. For more information about HBase, please visit the HBase documentation.
Project Info: Apache Pekko Connectors HBase | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-hbase
1.1.0-M1+116-9ab254b5-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.hbase |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.1.2" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-hbase" % "1.1.0-M1+116-9ab254b5-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.1.2</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-hbase_${scala.binary.version}</artifactId> <version>1.1.0-M1+116-9ab254b5-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.2", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-hbase_${versions.ScalaBinary}:1.1.0-M1+116-9ab254b5-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Converters
Converters map the domain object to a list of HBase mutations (Append
, Delete
, Increment
, Put
).
Put
- Scala
-
source
implicit def toBytes(string: String): Array[Byte] = Bytes.toBytes(string) case class Person(id: Int, name: String) val hBaseConverter: Person => immutable.Seq[Mutation] = { person => val put = new Put(s"id_${person.id}") put.addColumn("info", "name", person.name) List(put) }
- Java
-
source
Function<Person, List<Mutation>> hBaseConverter = person -> { try { Put put = new Put(String.format("id_%d", person.id).getBytes("UTF-8")); put.addColumn( "info".getBytes("UTF-8"), "name".getBytes("UTF-8"), person.name.getBytes("UTF-8")); return Collections.singletonList(put); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Append
- Scala
-
source
val appendHBaseConverter: Person => immutable.Seq[Mutation] = { person => // Append to a cell val append = new Append(s"id_${person.id}") append.add("info", "aliases", person.name) List(append) }
- Java
-
source
Function<Person, List<Mutation>> appendHBaseConverter = person -> { try { Append append = new Append(String.format("id_%d", person.id).getBytes("UTF-8")); append.add( "info".getBytes("UTF-8"), "aliases".getBytes("UTF-8"), person.name.getBytes("UTF-8")); return Collections.singletonList(append); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Delete
- Scala
-
source
val deleteHBaseConverter: Person => immutable.Seq[Mutation] = { person => // Delete the specified row val delete = new Delete(s"id_${person.id}") List(delete) }
- Java
-
source
Function<Person, List<Mutation>> deleteHBaseConverter = person -> { try { Delete delete = new Delete(String.format("id_%d", person.id).getBytes("UTF-8")); return Collections.singletonList(delete); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Increment
- Scala
-
source
val incrementHBaseConverter: Person => immutable.Seq[Mutation] = { person => // Increment a cell value val increment = new Increment(s"id_${person.id}") increment.addColumn("info", "numberOfChanges", 1) List(increment) }
- Java
-
source
Function<Person, List<Mutation>> incrementHBaseConverter = person -> { try { Increment increment = new Increment(String.format("id_%d", person.id).getBytes("UTF-8")); increment.addColumn("info".getBytes("UTF-8"), "numberOfChanges".getBytes("UTF-8"), 1); return Collections.singletonList(increment); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
Complex and noop mutations
To ignore an object return an empty List
- this will have no effect on HBase. You can also combine mutations to perform complex business logic:
- Scala
-
source
val mutationsHBaseConverter: Person => immutable.Seq[Mutation] = { person => if (person.id != 0) { if (person.name.isEmpty) { // Delete the specified row val delete = new Delete(s"id_${person.id}") List(delete) } else { // Insert or update a row val put = new Put(s"id_${person.id}") put.addColumn("info", "name", person.name) val increment = new Increment(s"id_${person.id}") increment.addColumn("info", "numberOfChanges", 1) List(put, increment) } } else { List.empty } }
- Java
-
source
Function<Person, List<Mutation>> complexHBaseConverter = person -> { try { byte[] id = String.format("id_%d", person.id).getBytes("UTF-8"); byte[] infoFamily = "info".getBytes("UTF-8"); if (person.id != 0 && person.name.isEmpty()) { Delete delete = new Delete(id); return Collections.singletonList(delete); } else if (person.id != 0) { Put put = new Put(id); put.addColumn(infoFamily, "name".getBytes("UTF-8"), person.name.getBytes("UTF-8")); Increment increment = new Increment(id); increment.addColumn(infoFamily, "numberOfChanges".getBytes("UTF-8"), 1); return Arrays.asList(put, increment); } else { return Collections.emptyList(); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return Collections.emptyList(); } };
If you return a list of mutations they will be applied in the same order. The list of mutations are not applied in an transaction, each mutation is independent.
Settings
HBase combinators require HTableSettings
. If the table referenced in the settings does not exist, it will be created on demand.
- Scala
-
source
val tableSettings = HTableSettings(HBaseConfiguration.create(), TableName.valueOf("person"), immutable.Seq("info"), hBaseConverter)
- Java
-
source
HTableSettings<Person> tableSettings = HTableSettings.create( HBaseConfiguration.create(), TableName.valueOf("person1"), Collections.singletonList("info"), hBaseConverter);
Source
- Scala
-
source
val scan = new Scan(new Get(Bytes.toBytes("id_100"))) val f = HTableStage .source(scan, tableSettings) .runWith(Sink.seq)
- Java
-
source
Scan scan = new Scan(new Get("id_300".getBytes("UTF-8"))); CompletionStage<List<Result>> f = HTableStage.source(scan, tableSettings).runWith(Sink.seq(), system);
Flow
- Scala
-
source
val flow = HTableStage.flow[Person](tableSettings) val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id))
- Java
-
source
Flow<Person, Person, NotUsed> flow = HTableStage.flow(tableSettings); Pair<NotUsed, CompletionStage<List<Person>>> run = Source.from(Arrays.asList(200, 201, 202, 203, 204)) .map((i) -> new Person(i, String.format("name_%d", i))) .via(flow) .toMat(Sink.seq(), Keep.both()) .run(system);
Sink
- Scala
-
source
val sink = HTableStage.sink[Person](tableSettings) val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink)
- Java
-
source
final Sink<Person, CompletionStage<Done>> sink = HTableStage.sink(tableSettings); CompletionStage<Done> o = Source.from(Arrays.asList(100, 101, 102, 103, 104)) .map((i) -> new Person(i, String.format("name %d", i))) .runWith(sink, system);
HBase administration commands
To manage HBase database, startup HBase shell ($HBASE_HOME/bin/shell
), and run following commands:
list // list tables
scan "person" // select * from person
disable "person" // Disable table "person", before drop
drop "person"