HBase

The connector provides sources, flows and sinks to interact with HBase database.

HBase is a column family NoSQL Database backed by HDFS. For more information about HBase, please visit the HBase documentation.

Project Info: Apache Pekko Connectors HBase
Artifact
org.apache.pekko
pekko-connectors-hbase
1.1.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.hbase
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
Maven
Gradle
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-hbase" % "1.1.0",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
  <pekko.version>1.1.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-hbase_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
def versions = [
  PekkoVersion: "1.1.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-hbase_${versions.ScalaBinary}:1.1.0"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Converters

Converters map the domain object to a list of HBase mutations (Append, Delete, Increment, Put).

Put

Scala
Java
sourceimplicit def toBytes(string: String): Array[Byte] = Bytes.toBytes(string)
case class Person(id: Int, name: String)

val hBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  val put = new Put(s"id_${person.id}")
  put.addColumn("info", "name", person.name)
  List(put)
}
sourceFunction<Person, List<Mutation>> hBaseConverter =
    person -> {
      try {
        Put put = new Put(String.format("id_%d", person.id).getBytes("UTF-8"));
        put.addColumn(
            "info".getBytes("UTF-8"), "name".getBytes("UTF-8"), person.name.getBytes("UTF-8"));

        return Collections.singletonList(put);
      } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
        return Collections.emptyList();
      }
    };

Append

Scala
Java
sourceval appendHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  // Append to a cell
  val append = new Append(s"id_${person.id}")
  append.add("info", "aliases", person.name)
  List(append)
}
sourceFunction<Person, List<Mutation>> appendHBaseConverter =
    person -> {
      try {
        Append append = new Append(String.format("id_%d", person.id).getBytes("UTF-8"));
        append.add(
            "info".getBytes("UTF-8"), "aliases".getBytes("UTF-8"), person.name.getBytes("UTF-8"));

        return Collections.singletonList(append);
      } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
        return Collections.emptyList();
      }
    };

Delete

Scala
Java
sourceval deleteHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  // Delete the specified row
  val delete = new Delete(s"id_${person.id}")
  List(delete)
}
sourceFunction<Person, List<Mutation>> deleteHBaseConverter =
    person -> {
      try {
        Delete delete = new Delete(String.format("id_%d", person.id).getBytes("UTF-8"));

        return Collections.singletonList(delete);
      } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
        return Collections.emptyList();
      }
    };

Increment

Scala
Java
sourceval incrementHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  // Increment a cell value
  val increment = new Increment(s"id_${person.id}")
  increment.addColumn("info", "numberOfChanges", 1)
  List(increment)
}
sourceFunction<Person, List<Mutation>> incrementHBaseConverter =
    person -> {
      try {
        Increment increment = new Increment(String.format("id_%d", person.id).getBytes("UTF-8"));
        increment.addColumn("info".getBytes("UTF-8"), "numberOfChanges".getBytes("UTF-8"), 1);

        return Collections.singletonList(increment);
      } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
        return Collections.emptyList();
      }
    };

Complex and noop mutations

To ignore an object return an empty List - this will have no effect on HBase. You can also combine mutations to perform complex business logic:

Scala
Java
sourceval mutationsHBaseConverter: Person => immutable.Seq[Mutation] = { person =>
  if (person.id != 0) {
    if (person.name.isEmpty) {
      // Delete the specified row
      val delete = new Delete(s"id_${person.id}")
      List(delete)
    } else {
      // Insert or update a row
      val put = new Put(s"id_${person.id}")
      put.addColumn("info", "name", person.name)

      val increment = new Increment(s"id_${person.id}")
      increment.addColumn("info", "numberOfChanges", 1)

      List(put, increment)
    }
  } else {
    List.empty
  }
}
sourceFunction<Person, List<Mutation>> complexHBaseConverter =
    person -> {
      try {
        byte[] id = String.format("id_%d", person.id).getBytes("UTF-8");
        byte[] infoFamily = "info".getBytes("UTF-8");

        if (person.id != 0 && person.name.isEmpty()) {
          Delete delete = new Delete(id);
          return Collections.singletonList(delete);
        } else if (person.id != 0) {
          Put put = new Put(id);
          put.addColumn(infoFamily, "name".getBytes("UTF-8"), person.name.getBytes("UTF-8"));

          Increment increment = new Increment(id);
          increment.addColumn(infoFamily, "numberOfChanges".getBytes("UTF-8"), 1);

          return Arrays.asList(put, increment);
        } else {
          return Collections.emptyList();
        }
      } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
        return Collections.emptyList();
      }
    };

If you return a list of mutations they will be applied in the same order. The list of mutations are not applied in an transaction, each mutation is independent.

Settings

HBase combinators require HTableSettings. If the table referenced in the settings does not exist, it will be created on demand.

Scala
Java
sourceval tableSettings =
  HTableSettings(HBaseConfiguration.create(), TableName.valueOf("person"), immutable.Seq("info"), hBaseConverter)
sourceHTableSettings<Person> tableSettings =
    HTableSettings.create(
        HBaseConfiguration.create(),
        TableName.valueOf("person1"),
        Collections.singletonList("info"),
        hBaseConverter);

Source

Scala
Java
sourceval scan = new Scan(new Get(Bytes.toBytes("id_100")))

val f = HTableStage
  .source(scan, tableSettings)
  .runWith(Sink.seq)
sourceScan scan = new Scan(new Get("id_300".getBytes("UTF-8")));

CompletionStage<List<Result>> f =
    HTableStage.source(scan, tableSettings).runWith(Sink.seq(), system);

Flow

Scala
Java
sourceval flow = HTableStage.flow[Person](tableSettings)

val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id))
sourceFlow<Person, Person, NotUsed> flow = HTableStage.flow(tableSettings);
Pair<NotUsed, CompletionStage<List<Person>>> run =
    Source.from(Arrays.asList(200, 201, 202, 203, 204))
        .map((i) -> new Person(i, String.format("name_%d", i)))
        .via(flow)
        .toMat(Sink.seq(), Keep.both())
        .run(system);

Sink

Scala
Java
sourceval sink = HTableStage.sink[Person](tableSettings)

val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink)
sourcefinal Sink<Person, CompletionStage<Done>> sink = HTableStage.sink(tableSettings);
CompletionStage<Done> o =
    Source.from(Arrays.asList(100, 101, 102, 103, 104))
        .map((i) -> new Person(i, String.format("name %d", i)))
        .runWith(sink, system);

HBase administration commands

To manage HBase database, startup HBase shell ($HBASE_HOME/bin/shell), and run following commands:

list // list tables
scan "person" // select * from person
disable "person" // Disable table "person", before drop
drop "person"