Apache Cassandra
Apache Cassandra is a free and open-source, distributed, wide column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.
Apache Pekko Connectors Cassandra offers an Apache Pekko Streams API on top of a CqlSession from the Datastax Java Driver version 4.0+. The driver configuration is provided in the same config format as Apache Pekko uses and can be placed in the same application.conf as your Apache Pekko settings.
| Project Info: Apache Pekko Connectors Cassandra | |
|---|---|
| Artifact | org.apache.pekko
pekko-connectors-cassandra
1.2.0+3-e195cec2-SNAPSHOT
|
| JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
| Scala versions | 2.13.17, 2.12.20, 3.3.6 |
| JPMS module name | pekko.stream.connectors.cassandra |
| License | |
| API documentation | |
| Forums | |
| Release notes | GitHub releases |
| Issues | Github issues |
| Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-connectors-cassandra" % "1.2.0+3-e195cec2-SNAPSHOT"- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-cassandra_${scala.binary.version}</artifactId> <version>1.2.0+3-e195cec2-SNAPSHOT</version> </dependency> </dependencies>- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-cassandra_${versions.ScalaBinary}:1.2.0+3-e195cec2-SNAPSHOT" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Sessions
Cassandra is accessed through CassandraSessionCassandraSessions which are managed by the CassandraSessionRegistryCassandraSessionRegistry Apache Pekko extension. This way a session is shared across all usages within the actor system and properly shut down after the actor system is shut down.
The CassandraSession is provided to the stream factory methods as an implicit parameter.
- Scala
-
source
import org.apache.pekko import pekko.stream.connectors.cassandra.CassandraSessionSettings import pekko.stream.connectors.cassandra.scaladsl.CassandraSession import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry val system: ActorSystem = // ??? val sessionSettings = CassandraSessionSettings() implicit val cassandraSession: CassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings) val version: Future[String] = cassandraSession .select("SELECT release_version FROM system.local;") .map(_.getString("release_version")) .runWith(Sink.head) - Java
-
source
import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.stream.connectors.cassandra.CassandraSessionSettings; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry; ActorSystem system = // ??? CassandraSessionSettings sessionSettings = CassandraSessionSettings.create(); CassandraSession cassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings); CompletionStage<String> version = cassandraSession .select("SELECT release_version FROM system.local;") .map(row -> row.getString("release_version")) .runWith(Sink.head(), system);
See custom session creation below for tweaking this.
Reading from Cassandra
CassandraSourceCassandraSource provides factory methods to get Apache Pekko Streams Sources from CQL queries and from com.datastax.oss.driver.api.core.cql.Statements.
Dynamic parameters can be provided to the CQL as variable arguments.
- Scala
-
source
import org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraSource val ids: Future[immutable.Seq[Int]] = CassandraSource(s"SELECT id FROM $intTable").map(row => row.getInt("id")).runWith(Sink.seq) val idsWhere: Future[Int] = CassandraSource(s"SELECT * FROM $intTable WHERE id = ?", value).map(_.getInt("id")).runWith(Sink.head) - Java
-
source
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSource; CompletionStage<List<Integer>> select = CassandraSource.create(cassandraSession, "SELECT id FROM " + idtable + ";") .map(r -> r.getInt("id")) .runWith(Sink.seq(), system); CompletionStage<Integer> select = CassandraSource.create( cassandraSession, "SELECT * FROM " + idtable + " WHERE id = ?;", value) .map(r -> r.getInt("id")) .runWith(Sink.head(), system);
If the statement requires specific settings, you may pass any com.datastax.oss.driver.api.core.cql.Statement.
- Scala
-
source
import com.datastax.oss.driver.api.core.cql.{ Row, SimpleStatement } val stmt = SimpleStatement.newInstance(s"SELECT * FROM $intTable").setPageSize(20) val rows: Future[immutable.Seq[Row]] = CassandraSource(stmt).runWith(Sink.seq) - Java
-
source
import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; Statement<?> stmt = SimpleStatement.newInstance("SELECT * FROM " + idtable + ";").setPageSize(20); CompletionStage<List<Integer>> select = CassandraSource.create(cassandraSession, stmt) .map(r -> r.getInt("id")) .runWith(Sink.seq(), system);
Here we used a basic sink to complete the stream by collecting all of the stream elements into a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.
Writing to Cassandra
CassandraFlowCassandraFlow provides factory methods to get Apache Pekko Streams flows to run CQL statements that change data (UPDATE, INSERT). Apache Pekko Connectors Cassandra creates a PreparedStatement and for every stream element the statementBinder function binds the CQL placeholders to data.
The incoming elements are emitted unchanged for further processing.
- Scala
-
source
import org.apache.pekko import pekko.stream.connectors.cassandra.CassandraWriteSettings import pekko.stream.connectors.cassandra.scaladsl.CassandraFlow import com.datastax.oss.driver.api.core.cql.{ BoundStatement, PreparedStatement } case class Person(id: Int, name: String, city: String) val persons = immutable.Seq(Person(12, "John", "London"), Person(43, "Umberto", "Roma"), Person(56, "James", "Chicago")) val statementBinder: (Person, PreparedStatement) => BoundStatement = (person, preparedStatement) => preparedStatement.bind(Int.box(person.id), person.name, person.city) val written: Future[immutable.Seq[Person]] = Source(persons) .via( CassandraFlow.create(CassandraWriteSettings.defaults, s"INSERT INTO $table(id, name, city) VALUES (?, ?, ?)", statementBinder)) .runWith(Sink.seq) - Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Function2; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.connectors.cassandra.CassandraWriteSettings; import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraFlow; import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4; import org.apache.pekko.stream.javadsl.SourceWithContext; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; List<Person> persons = Arrays.asList( new Person(12, "John", "London"), new Person(43, "Umberto", "Roma"), new Person(56, "James", "Chicago")); Function2<Person, PreparedStatement, BoundStatement> statementBinder = (person, preparedStatement) -> preparedStatement.bind(person.id, person.name, person.city); CompletionStage<List<Person>> written = Source.from(persons) .via( CassandraFlow.create( cassandraSession, CassandraWriteSettings.defaults(), "INSERT INTO " + table + "(id, name, city) VALUES (?, ?, ?)", statementBinder)) .runWith(Sink.seq(), system);
Update flows with context
Apache Pekko Connectors Cassandra flows offer “With Context”-support which integrates nicely with some other Apache Pekko Connectors connectors.
- Scala
-
source
val personsAndHandles: SourceWithContext[Person, AckHandle, NotUsed] = // ??? val written: Future[Done] = personsAndHandles .via( CassandraFlow.withContext( CassandraWriteSettings.defaults, s"INSERT INTO $table(id, name, city) VALUES (?, ?, ?)", (person, preparedStatement) => preparedStatement.bind(Int.box(person.id), person.name, person.city))) .asSource .mapAsync(1) { case (_, handle) => handle.ack() } .runWith(Sink.ignore) - Java
-
source
SourceWithContext<Person, AckHandle, NotUsed> from = // ???; CompletionStage<Done> written = from.via( CassandraFlow.withContext( cassandraSession, CassandraWriteSettings.defaults(), "INSERT INTO " + table + "(id, name, city) VALUES (?, ?, ?)", (person, preparedStatement) -> preparedStatement.bind(person.id, person.name, person.city))) .asSource() .mapAsync(1, pair -> pair.second().ack()) .runWith(Sink.ignore(), system);
Custom Session creation
Session creation and configuration is controlled via settings in application.conf. The CassandraSessionSettingsCassandraSessionSettings accept a full path to a configuration section which needs to specify a session-provider setting. The CassandraSessionRegistryCassandraSessionRegistry expects a fully qualified class name to a class implementing CqlSessionProviderCqlSessionProvider.
Apache Pekko Connectors Cassandra includes a default implementation DefaultSessionProviderDefaultSessionProvider, which is referenced in the default configuration pekko.connectors.cassandra.
The DefaultSessionProviderDefaultSessionProvider config section must contain:
- a settings section
service-discoverywhich may be used to discover Cassandra contact points via Apache Pekko Discovery, - and a reference to a Cassandra config section in
datastax-java-driver-configwhich is used to configure the Cassandra client. For details see the Datastax Java Driver configuration and the driver’sreference.conf.
- reference.conf
-
source
# SPDX-License-Identifier: Apache-2.0 pekko.connectors.cassandra { # The implementation of `org.apache.pekko.stream.connectors.cassandra.CqlSessionProvider` # used for creating the `CqlSession`. # It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters. session-provider = "org.apache.pekko.stream.connectors.cassandra.DefaultSessionProvider" # Configure Pekko Discovery by setting a service name service-discovery { name = "" lookup-timeout = 1 s } # The ExecutionContext to use for the session tasks and future composition. session-dispatcher = "pekko.actor.default-dispatcher" # Full config path to the Datastax Java driver's configuration section. # When connecting to more than one Cassandra cluster different session configuration can be # defined with this property. # See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview # and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/ datastax-java-driver-config = "datastax-java-driver" }
In simple cases your datastax-java-driver section will need to define contact-points and load-balancing-policy.local-datacenter. To make the Cassandra driver retry its initial connection attempts, add advanced.reconnect-on-init = true.
- application.conf
-
source
datastax-java-driver { basic { contact-points = [ "127.0.0.1:9042" ] load-balancing-policy.local-datacenter = datacenter1 } advanced.reconnect-on-init = true }
Using Apache Pekko Discovery
To use Apache Pekko Discovery make sure the pekko-discovery dependency is on you classpath.
- sbt
val PekkoVersion = "1.1.5" libraryDependencies += "org.apache.pekko" %% "pekko-discovery" % PekkoVersion- Maven
<properties> <pekko.version>1.1.5</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-discovery_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>- Gradle
def versions = [ PekkoVersion: "1.1.5", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-discovery_${versions.ScalaBinary}:${versions.PekkoVersion}" }
To enable Apache Pekko Discovery with the DefaultSessionProviderDefaultSessionProvider, set up the desired service name in the discovery mechanism of your choice and pass that name in service-discovery.name. The example below extends the pekko.connectors.cassandra config section and only overwrites the service name.
- application.conf
-
source
pekko { discovery.method = config } pekko.discovery.config.services = { cassandra-service = { endpoints = [ { host = "127.0.0.1" port = 9042 } ] } } // inherit defaults from `pekko.connectors.cassandra` settings example-with-pekko-discovery: ${pekko.connectors.cassandra} { service-discovery.name = "cassandra-service" }
Use the full config section path to create the CassandraSessionSettingsCassandraSessionSettings.
- Scala
-
source
val sessionSettings = CassandraSessionSettings("example-with-pekko-discovery") implicit val session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings) - Java
-
source
CassandraSessionSettings sessionSettings = CassandraSessionSettings.create("example-with-pekko-discovery"); CassandraSession session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings);