Apache Geode

Apache Geode is a distributed datagrid (formerly called “Gemfire” which used to be Pivotal’s packaging of Geode and now is VMware Tanzu).

Apache Pekko Connectors Geode provides flows and sinks to put elements into Geode, and a source to retrieve elements from it. It stores key-value-pairs. Keys and values must be serialized with Geode’s support for it.

Project Info: Apache Pekko Connectors Geode
Artifact
org.apache.pekko
pekko-connectors-geode
1.1.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.geode
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
Maven
Gradle
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-geode" % "1.1.0",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
  <pekko.version>1.1.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-geode_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
def versions = [
  PekkoVersion: "1.1.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-geode_${versions.ScalaBinary}:1.1.0"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Setup

Connection

The connection to Geode is handled by a ClientCache. A single ClientCache per application is enough. ClientCache also holds a single PDXSerializer.

The Geode client should be closed after use, it is recommended to close it on actor system termination.

Scala
Java
sourceval geodeSettings = GeodeSettings(hostname, port = 10334)
  .withConfiguration(c => c.setPoolIdleTimeout(10))
val geode = new Geode(geodeSettings)
system.registerOnTermination(geode.close())
sourceGeodeSettings settings =
    GeodeSettings.create(hostname, 10334).withConfiguration(c -> c.setPoolIdleTimeout(10));
Geode geode = new Geode(settings);
system.registerOnTermination(() -> geode.close());

Apache Geode supports continuous queries. Continuous query rely on server events, thus Apache Pekko Connectors Geode needs to listen to those events. This behaviour – as it consumes more resources – is isolated in a Scala trait and/or an specialized Java class.

Scala
Java
sourceval geode = new Geode(geodeSettings) with PoolSubscription
system.registerOnTermination(geode.close())
sourceGeodeWithPoolSubscription geode = new GeodeWithPoolSubscription(settings);

Region

Define a region setting to describe how to access region and the key extraction function.

Scala
Java
sourceval personsRegionSettings: RegionSettings[Int, Person] = RegionSettings("persons", (p: Person) => p.id)
val animalsRegionSettings: RegionSettings[Int, Animal] = RegionSettings("animals", (a: Animal) => a.id)
val complexesRegionSettings: RegionSettings[UUID, Complex] = RegionSettings("complexes", (a: Complex) => a.id)
sourceprotected final RegionSettings<Integer, Person> personRegionSettings =
    RegionSettings.create("persons", Person::getId);
protected final RegionSettings<Integer, Animal> animalRegionSettings =
    RegionSettings.create("animals", Animal::getId);

Serialization

Objects must be serialized to be stored in or retrieved from Geode. Only PDX format is available with Apache Pekko Connectors Geode. PDXEncoders support many options as described in Geode PDX Serialization. A PdxSerializer must be provided to Geode when reading from or writing to a region.

Scala
Java
sourceobject PersonPdxSerializer extends PekkoPdxSerializer[Person] {
  override def clazz: Class[Person] = classOf[Person]

  override def toData(o: scala.Any, out: PdxWriter): Boolean =
    if (o.isInstanceOf[Person]) {
      val p = o.asInstanceOf[Person]
      out.writeInt("id", p.id)
      out.writeString("name", p.name)
      out.writeDate("birthDate", p.birthDate)
      true
    } else
      false

  override def fromData(clazz: Class[_], in: PdxReader): AnyRef = {
    val id: Int = in.readInt("id")
    val name: String = in.readString("name")
    val birthDate: Date = in.readDate("birthDate")
    Person(id, name, birthDate)
  }
}
sourcepublic class PersonPdxSerializer implements PekkoPdxSerializer<Person> {

  @Override
  public Class<Person> clazz() {
    return Person.class;
  }

  @Override
  public boolean toData(Object o, PdxWriter out) {
    if (o instanceof Person) {
      Person p = (Person) o;
      out.writeInt("id", p.getId());
      out.writeString("name", p.getName());
      out.writeDate("birthDate", p.getBirthDate());
      return true;
    }
    return false;
  }

  @Override
  public Object fromData(Class<?> clazz, PdxReader in) {
    int id = in.readInt("id");
    String name = in.readString("name");
    Date birthDate = in.readDate("birthDate");
    return new Person(id, name, birthDate);
  }
}

This Apache Pekko Connectors Geode provides a generic solution for Scala users based on Shapeless for Scala 2 and for Scala 3 using the built-in tuple generic metaprogramming which may generate serializers for case classes at compile time.

Java users need to implement custom serializers manually, or use runtime reflection as described in Using Automatic Reflection-Based PDX Serialization.

Writing to Geode

This example stores data in Geode within a flow.

Scala
Java
sourceval flow: Flow[Person, Person, NotUsed] = geode.flow(personsRegionSettings)

val fut = source.via(flow).runWith(Sink.ignore)
sourceFlow<Person, Person, NotUsed> flow =
    geode.flow(personRegionSettings, new PersonPdxSerializer());

CompletionStage<List<Person>> run =
    source.via(flow).toMat(Sink.seq(), Keep.right()).run(system);

This example stores data in Geode by using a sink.

Scala
Java
sourceval animalsRegionSettings: RegionSettings[Int, Animal] =
  RegionSettings("animals", (a: Animal) => a.id)

val sink: Sink[Animal, Future[Done]] =
  geode.sink(animalsRegionSettings)

val fut: Future[Done] = source.runWith(sink)
sourceSink<Animal, CompletionStage<Done>> sink =
    geode.sink(animalRegionSettings, new AnimalPdxSerializer());

RunnableGraph<CompletionStage<Done>> runnableGraph = source.toMat(sink, Keep.right());

Reading from Geode

Simple query

Apache Geode supports simple queries.

Scala
Java
sourceval geode = new Geode(geodeSettings)
system.registerOnTermination(geode.close())

val source =
  geode
    .query[Person](s"select * from /persons order by id")
    .runWith(Sink.foreach(e => log.debug(s"$e")))
sourceCompletionStage<Done> personsDone =
    geode
        .query("select * from /persons", new PersonPdxSerializer())
        .runForeach(
            p -> {
              LOGGER.debug(p.toString());
            },
            system);

Continuous query

Continuous queries need to be explicitly closed, to connect creating and closing a unique identifier needs to be passed to both continuousQuery and closeContinuousQuery.

Scala
Java
sourceval source =
  geode
    .continuousQuery[Person](Symbol("test"), s"select * from /persons")
    .runWith(Sink.fold(0) { (c, p) =>
      log.debug(s"$p $c")
      if (c == 19) {
        geode.closeContinuousQuery(Symbol("test")).foreach { _ =>
          log.debug("test cQuery is closed")
        }

      }
      c + 1
    })
sourceCompletionStage<Done> fut =
    geode
        .continuousQuery("test", "select * from /persons", new PersonPdxSerializer())
        .runForeach(
            p -> {
              LOGGER.debug(p.toString());
              if (p.getId() == 120) {
                geode.closeContinuousQuery("test");
              }
            },
            system);

Geode basic commands

Assuming Apache Geode is installed:

gfsh

From the Geode shell:

start locator --name=locator
configure pdx --read-serialized=true
start server --name=server

create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2