Apache Geode
Apache Geode is a distributed datagrid (formerly called “Gemfire” which used to be Pivotal’s packaging of Geode and now is VMware Tanzu).
Apache Pekko Connectors Geode provides flows and sinks to put elements into Geode, and a source to retrieve elements from it. It stores key-value-pairs. Keys and values must be serialized with Geode’s support for it.
Project Info: Apache Pekko Connectors Geode | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-geode
1.0.2
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.14, 2.12.20, 3.3.3 |
JPMS module name | pekko.stream.connectors.geode |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.0.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-geode" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.0.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-geode_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.0.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-geode_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Setup
Connection
The connection to Geode is handled by a ClientCache. A single ClientCache
per application is enough. ClientCache
also holds a single PDXSerializer
.
The Geode client should be closed after use, it is recommended to close it on actor system termination.
- Scala
-
source
val geodeSettings = GeodeSettings(hostname, port = 10334) .withConfiguration(c => c.setPoolIdleTimeout(10)) val geode = new Geode(geodeSettings) system.registerOnTermination(geode.close())
- Java
-
source
GeodeSettings settings = GeodeSettings.create(hostname, 10334).withConfiguration(c -> c.setPoolIdleTimeout(10)); Geode geode = new Geode(settings); system.registerOnTermination(() -> geode.close());
Apache Geode supports continuous queries. Continuous query rely on server events, thus Apache Pekko Connectors Geode needs to listen to those events. This behaviour – as it consumes more resources – is isolated in a Scala trait and/or an specialized Java class.
- Scala
-
source
val geode = new Geode(geodeSettings) with PoolSubscription system.registerOnTermination(geode.close())
- Java
-
source
GeodeWithPoolSubscription geode = new GeodeWithPoolSubscription(settings);
Region
Define a region setting to describe how to access region and the key extraction function.
- Scala
-
source
val personsRegionSettings: RegionSettings[Int, Person] = RegionSettings("persons", (p: Person) => p.id) val animalsRegionSettings: RegionSettings[Int, Animal] = RegionSettings("animals", (a: Animal) => a.id) val complexesRegionSettings: RegionSettings[UUID, Complex] = RegionSettings("complexes", (a: Complex) => a.id)
- Java
-
source
protected final RegionSettings<Integer, Person> personRegionSettings = RegionSettings.create("persons", Person::getId); protected final RegionSettings<Integer, Animal> animalRegionSettings = RegionSettings.create("animals", Animal::getId);
Serialization
Objects must be serialized to be stored in or retrieved from Geode. Only PDX format is available with Apache Pekko Connectors Geode. PDXEncoder
s support many options as described in Geode PDX Serialization. A PdxSerializer
must be provided to Geode when reading from or writing to a region.
- Scala
-
source
object PersonPdxSerializer extends PekkoPdxSerializer[Person] { override def clazz: Class[Person] = classOf[Person] override def toData(o: scala.Any, out: PdxWriter): Boolean = if (o.isInstanceOf[Person]) { val p = o.asInstanceOf[Person] out.writeInt("id", p.id) out.writeString("name", p.name) out.writeDate("birthDate", p.birthDate) true } else false override def fromData(clazz: Class[_], in: PdxReader): AnyRef = { val id: Int = in.readInt("id") val name: String = in.readString("name") val birthDate: Date = in.readDate("birthDate") Person(id, name, birthDate) } }
- Java
-
source
public class PersonPdxSerializer implements PekkoPdxSerializer<Person> { @Override public Class<Person> clazz() { return Person.class; } @Override public boolean toData(Object o, PdxWriter out) { if (o instanceof Person) { Person p = (Person) o; out.writeInt("id", p.getId()); out.writeString("name", p.getName()); out.writeDate("birthDate", p.getBirthDate()); return true; } return false; } @Override public Object fromData(Class<?> clazz, PdxReader in) { int id = in.readInt("id"); String name = in.readString("name"); Date birthDate = in.readDate("birthDate"); return new Person(id, name, birthDate); } }
This Apache Pekko Connectors Geode provides a generic solution for Scala users based on Shapeless for Scala 2 and for Scala 3 using the built-in tuple generic metaprogramming which may generate serializers for case classes at compile time.
Java users need to implement custom serializers manually, or use runtime reflection as described in Using Automatic Reflection-Based PDX Serialization.
Writing to Geode
This example stores data in Geode within a flow.
- Scala
-
source
val flow: Flow[Person, Person, NotUsed] = geode.flow(personsRegionSettings) val fut = source.via(flow).runWith(Sink.ignore)
- Java
-
source
Flow<Person, Person, NotUsed> flow = geode.flow(personRegionSettings, new PersonPdxSerializer()); CompletionStage<List<Person>> run = source.via(flow).toMat(Sink.seq(), Keep.right()).run(system);
This example stores data in Geode by using a sink.
- Scala
-
source
val animalsRegionSettings: RegionSettings[Int, Animal] = RegionSettings("animals", (a: Animal) => a.id) val sink: Sink[Animal, Future[Done]] = geode.sink(animalsRegionSettings) val fut: Future[Done] = source.runWith(sink)
- Java
-
source
Sink<Animal, CompletionStage<Done>> sink = geode.sink(animalRegionSettings, new AnimalPdxSerializer()); RunnableGraph<CompletionStage<Done>> runnableGraph = source.toMat(sink, Keep.right());
Reading from Geode
Simple query
Apache Geode supports simple queries.
- Scala
-
source
val geode = new Geode(geodeSettings) system.registerOnTermination(geode.close()) val source = geode .query[Person](s"select * from /persons order by id") .runWith(Sink.foreach(e => log.debug(s"$e")))
- Java
-
source
CompletionStage<Done> personsDone = geode .query("select * from /persons", new PersonPdxSerializer()) .runForeach( p -> { LOGGER.debug(p.toString()); }, system);
Continuous query
Continuous queries need to be explicitly closed, to connect creating and closing a unique identifier needs to be passed to both continuousQuery
and closeContinuousQuery
.
- Scala
-
source
val source = geode .continuousQuery[Person](Symbol("test"), s"select * from /persons") .runWith(Sink.fold(0) { (c, p) => log.debug(s"$p $c") if (c == 19) { geode.closeContinuousQuery(Symbol("test")).foreach { _ => log.debug("test cQuery is closed") } } c + 1 })
- Java
-
source
CompletionStage<Done> fut = geode .continuousQuery("test", "select * from /persons", new PersonPdxSerializer()) .runForeach( p -> { LOGGER.debug(p.toString()); if (p.getId() == 120) { geode.closeContinuousQuery("test"); } }, system);
Geode basic commands
Assuming Apache Geode is installed:
gfsh
From the Geode shell:
start locator --name=locator
configure pdx --read-serialized=true
start server --name=server
create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2