Journal

Features

  • All operations required by the Pekko Persistence journal plugin API are fully supported.
  • The plugin uses Cassandra in a pure log-oriented way i.e. data is only ever inserted but never updated (deletions are made on user request only).
  • Writes of messages are batched to optimize throughput for persistAsync. See batch writes for details how to configure batch sizes. The plugin was tested to work properly under high load.
  • Messages written by a single persistent actor are partitioned across the cluster to achieve scalability with data volume by adding nodes.
  • Persistence Query support by CassandraReadJournal

Schema

The keyspace and tables needs to be created before using the plugin.

Warning

Auto creation of the keyspace and tables is included as a development convenience and should never be used in production. Cassandra does not handle concurrent schema migrations well and if every Pekko node tries to create the schema at the same time you’ll get column id mismatch errors in Cassandra.

The default keyspace used by the plugin is pekko, it should be created with the NetworkTopology replication strategy with a replication factor of at least 3:

CREATE KEYSPACE IF NOT EXISTS pekko WITH replication = {'class': 'NetworkTopologyStrategy', '<your_dc_name>' : 3 }; 

For local testing, and the default if you enable pekko.persistence.cassandra.journal.keyspace-autocreate you can use the following:

sourceCREATE KEYSPACE IF NOT EXISTS pekko
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };

There are multiple tables required. These need to be created before starting your application. For local testing you can enable pekko.persistence.cassandra.journal.tables-autocreate. The default table definitions look like this:

sourceCREATE TABLE IF NOT EXISTS pekko.messages (
  persistence_id text,
  partition_nr bigint,
  sequence_nr bigint,
  timestamp timeuuid,
  timebucket text,
  writer_uuid text,
  ser_id int,
  ser_manifest text,
  event_manifest text,
  event blob,
  meta_ser_id int,
  meta_ser_manifest text,
  meta blob,
  tags set<text>,
  PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp))
  WITH gc_grace_seconds =864000
  AND compaction = {
    'class' : 'SizeTieredCompactionStrategy',
    'enabled' : true,
    'tombstone_compaction_interval' : 86400,
    'tombstone_threshold' : 0.2,
    'unchecked_tombstone_compaction' : false,
    'bucket_high' : 1.5,
    'bucket_low' : 0.5,
    'max_threshold' : 32,
    'min_threshold' : 4,
    'min_sstable_size' : 50
    };

CREATE TABLE IF NOT EXISTS pekko.tag_views (
  tag_name text,
  persistence_id text,
  sequence_nr bigint,
  timebucket bigint,
  timestamp timeuuid,
  tag_pid_sequence_nr bigint,
  writer_uuid text,
  ser_id int,
  ser_manifest text,
  event_manifest text,
  event blob,
  meta_ser_id int,
  meta_ser_manifest text,
  meta blob,
  PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr))
  WITH gc_grace_seconds =864000
  AND compaction = {
    'class' : 'SizeTieredCompactionStrategy',
    'enabled' : true,
    'tombstone_compaction_interval' : 86400,
    'tombstone_threshold' : 0.2,
    'unchecked_tombstone_compaction' : false,
    'bucket_high' : 1.5,
    'bucket_low' : 0.5,
    'max_threshold' : 32,
    'min_threshold' : 4,
    'min_sstable_size' : 50
    };

CREATE TABLE IF NOT EXISTS pekko.tag_write_progress(
  persistence_id text,
  tag text,
  sequence_nr bigint,
  tag_pid_sequence_nr bigint,
  offset timeuuid,
  PRIMARY KEY (persistence_id, tag));

CREATE TABLE IF NOT EXISTS pekko.tag_scanning(
  persistence_id text,
  sequence_nr bigint,
  PRIMARY KEY (persistence_id));

CREATE TABLE IF NOT EXISTS pekko.metadata(
  persistence_id text PRIMARY KEY,
  deleted_to bigint,
  properties map<text,text>);

CREATE TABLE IF NOT EXISTS pekko.all_persistence_ids(
  persistence_id text PRIMARY KEY);

Messages table

Descriptions of the important columns in the messages table:

Column Description
persistence_id The persistence id
partition_nr Artificial partition key to ensure partitions do not grow too large
sequence_nr Sequence number of the event
timestamp A type 2 UUID a.k.a TimeUUID for ordering events in the events by tag query
timebucket The time bucket to partition the events by tag query. Only in this table as events by tag used to use a materialized view
writer_uuid A UUID for the actor system that wrote the event. Used to detect multiple writers for the same persistence id
ser_id The serialization id of the user payload
ser_manifest The serialization manifest of the user payload
event_manifest The manifest used by event adapters
event The serialized user payload

Old columns, no longer needed but may be in your schema if you have used older versions of the plugin and migrated. See the migration guide for when these have been removed.

Column Description
used A static column to record that the artificial partition has been used to detected that all events in a partition have been deleted
message Pre 0.6 serialized the PersistentRepr (an internal Pekko type) into this column. Newer versions use event and serialize the user payload

Configuration

To activate the journal plugin, add the following line to your Pekko application.conf:

pekko.persistence.journal.plugin = "pekko.persistence.cassandra.journal"

This will run the journal with its default settings. The default settings can be changed with the configuration properties defined in reference.conf. Journal configuration is under pekko.persistence.cassandra.journal.

All Cassandra driver settings are via its standard profile mechanism.

One important setting is to configure the database driver to retry the initial connection:

datastax-java-driver.advanced.reconnect-on-init = true

It is not enabled automatically as it is in the driver’s reference.conf and is not overridable in a profile.

Target partition size

The messages table that stores the events is partitioned by (persistence_id, partition_nr). The partition_nr is an artificial partition key to ensure that the Cassandra partition does not get too large if there are a lot of events for a single persistence_id.

pekko.persistence.cassandra.journal.target-partition-size controls the number of events that the journal tries to put in each Cassandra partition. It is a target as persistAll calls will have all the events in the same partition even if it will exceed the target partition size to ensure atomicity.

It is not possible to change the value once you have data so consider if the default of 500000 is right for your application before deploying to production. Multiply the value by your expected serialized event size to roughly work out how large the Cassandra partition will grow to. See wide partitions in Cassandra for a summary of how large a partition should be depending on the version of Cassandra you are using.

Consistency

By default the journal uses QUORUM for all reads and writes. For setups with multiple datacenters this can set to LOCAL_QUORUM to avoid cross DC latency for writes and reads.

The risk of using LOCAL_QUORUM is that in the event of a datacenter outage events that have been confirmed and any side effects run may not have be replicated to the other datacenters. If a persistent actor for which this has happened is started in another datacenter it may not see the latest event if it wasn’t replicated. If the Cassandra data in the datacenter with the outage is recovered then the event that was not replicated will eventually be replicated to all datacenters resulting in a duplicate sequence number. With the default replay-filter the duplicate event from the original datacenter will is discarded in subsequent replays of the persistent actor.

Using QUORUM for multi datacenter setups increases latency and decreased availability as to reach QUORUM nodes in other datacenters need to respond. During a datacenter outage or a cross datacenter network partition this won’t be possible resulting in failed reads and writes.

Using a consistency level other than QUORUM or LOCAL_QUORUM is highly discouraged.

datastax-java-driver.profiles {
  pekko-persistence-cassandra-profile {
    basic.request.consistency = QUORUM
  }
}

Delete all events

The CleanupCleanup tool can be used for deleting all events and/or snapshots given list of persistenceIds without using persistent actors. It’s important that the actors with corresponding persistenceId are not running at the same time as using the tool. See Database Cleanup for more details.