Journal
Features
- All operations required by the Pekko Persistence journal plugin API are fully supported.
- The plugin uses Cassandra in a pure log-oriented way i.e. data is only ever inserted but never updated (deletions are made on user request only).
- Writes of messages are batched to optimize throughput for
persistAsync
. See batch writes for details how to configure batch sizes. The plugin was tested to work properly under high load. - Messages written by a single persistent actor are partitioned across the cluster to achieve scalability with data volume by adding nodes.
- Persistence Query support by
CassandraReadJournal
Schema
The keyspace and tables needs to be created before using the plugin.
Auto creation of the keyspace and tables is included as a development convenience and should never be used in production. Cassandra does not handle concurrent schema migrations well and if every Pekko node tries to create the schema at the same time you’ll get column id mismatch errors in Cassandra.
The default keyspace used by the plugin is pekko
, it should be created with the NetworkTopology replication strategy with a replication factor of at least 3:
CREATE KEYSPACE IF NOT EXISTS pekko WITH replication = {'class': 'NetworkTopologyStrategy', '<your_dc_name>' : 3 };
For local testing, and the default if you enable pekko.persistence.cassandra.journal.keyspace-autocreate
you can use the following:
sourceCREATE KEYSPACE IF NOT EXISTS pekko
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
There are multiple tables required. These need to be created before starting your application. For local testing you can enable pekko.persistence.cassandra.journal.tables-autocreate
. The default table definitions look like this:
sourceCREATE TABLE IF NOT EXISTS pekko.messages (
persistence_id text,
partition_nr bigint,
sequence_nr bigint,
timestamp timeuuid,
timebucket text,
writer_uuid text,
ser_id int,
ser_manifest text,
event_manifest text,
event blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
tags set<text>,
PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp))
WITH gc_grace_seconds =864000
AND compaction = {
'class' : 'SizeTieredCompactionStrategy',
'enabled' : true,
'tombstone_compaction_interval' : 86400,
'tombstone_threshold' : 0.2,
'unchecked_tombstone_compaction' : false,
'bucket_high' : 1.5,
'bucket_low' : 0.5,
'max_threshold' : 32,
'min_threshold' : 4,
'min_sstable_size' : 50
};
CREATE TABLE IF NOT EXISTS pekko.tag_views (
tag_name text,
persistence_id text,
sequence_nr bigint,
timebucket bigint,
timestamp timeuuid,
tag_pid_sequence_nr bigint,
writer_uuid text,
ser_id int,
ser_manifest text,
event_manifest text,
event blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr))
WITH gc_grace_seconds =864000
AND compaction = {
'class' : 'SizeTieredCompactionStrategy',
'enabled' : true,
'tombstone_compaction_interval' : 86400,
'tombstone_threshold' : 0.2,
'unchecked_tombstone_compaction' : false,
'bucket_high' : 1.5,
'bucket_low' : 0.5,
'max_threshold' : 32,
'min_threshold' : 4,
'min_sstable_size' : 50
};
CREATE TABLE IF NOT EXISTS pekko.tag_write_progress(
persistence_id text,
tag text,
sequence_nr bigint,
tag_pid_sequence_nr bigint,
offset timeuuid,
PRIMARY KEY (persistence_id, tag));
CREATE TABLE IF NOT EXISTS pekko.tag_scanning(
persistence_id text,
sequence_nr bigint,
PRIMARY KEY (persistence_id));
CREATE TABLE IF NOT EXISTS pekko.metadata(
persistence_id text PRIMARY KEY,
deleted_to bigint,
properties map<text,text>);
CREATE TABLE IF NOT EXISTS pekko.all_persistence_ids(
persistence_id text PRIMARY KEY);
Messages table
Descriptions of the important columns in the messages table:
Column | Description |
---|---|
persistence_id | The persistence id |
partition_nr | Artificial partition key to ensure partitions do not grow too large |
sequence_nr | Sequence number of the event |
timestamp | A type 2 UUID a.k.a TimeUUID for ordering events in the events by tag query |
timebucket | The time bucket to partition the events by tag query. Only in this table as events by tag used to use a materialized view |
writer_uuid | A UUID for the actor system that wrote the event. Used to detect multiple writers for the same persistence id |
ser_id | The serialization id of the user payload |
ser_manifest | The serialization manifest of the user payload |
event_manifest | The manifest used by event adapters |
event | The serialized user payload |
Old columns, no longer needed but may be in your schema if you have used older versions of the plugin and migrated. See the migration guide for when these have been removed.
Column | Description |
---|---|
used | A static column to record that the artificial partition has been used to detected that all events in a partition have been deleted |
message | Pre 0.6 serialized the PersistentRepr (an internal Pekko type) into this column. Newer versions use event and serialize the user payload |
Configuration
To activate the journal plugin, add the following line to your Pekko application.conf
:
pekko.persistence.journal.plugin = "pekko.persistence.cassandra.journal"
This will run the journal with its default settings. The default settings can be changed with the configuration properties defined in reference.conf. Journal configuration is under pekko.persistence.cassandra.journal
.
All Cassandra driver settings are via its standard profile mechanism.
One important setting is to configure the database driver to retry the initial connection:
datastax-java-driver.advanced.reconnect-on-init = true
It is not enabled automatically as it is in the driver’s reference.conf and is not overridable in a profile.
Target partition size
The messages table that stores the events is partitioned by (persistence_id, partition_nr)
. The partition_nr
is an artificial partition key to ensure that the Cassandra partition does not get too large if there are a lot of events for a single persistence_id
.
pekko.persistence.cassandra.journal.target-partition-size
controls the number of events that the journal tries to put in each Cassandra partition. It is a target as persistAll
calls will have all the events in the same partition even if it will exceed the target partition size to ensure atomicity.
It is not possible to change the value once you have data so consider if the default of 500000 is right for your application before deploying to production. Multiply the value by your expected serialized event size to roughly work out how large the Cassandra partition will grow to. See wide partitions in Cassandra for a summary of how large a partition should be depending on the version of Cassandra you are using.
Consistency
By default the journal uses QUORUM
for all reads and writes. For setups with multiple datacenters this can set to LOCAL_QUORUM
to avoid cross DC latency for writes and reads.
The risk of using LOCAL_QUORUM
is that in the event of a datacenter outage events that have been confirmed and any side effects run may not have be replicated to the other datacenters. If a persistent actor for which this has happened is started in another datacenter it may not see the latest event if it wasn’t replicated. If the Cassandra data in the datacenter with the outage is recovered then the event that was not replicated will eventually be replicated to all datacenters resulting in a duplicate sequence number. With the default replay-filter
the duplicate event from the original datacenter will is discarded in subsequent replays of the persistent actor.
Using QUORUM
for multi datacenter setups increases latency and decreased availability as to reach QUORUM
nodes in other datacenters need to respond. During a datacenter outage or a cross datacenter network partition this won’t be possible resulting in failed reads and writes.
Using a consistency level other than QUORUM
or LOCAL_QUORUM
is highly discouraged.
datastax-java-driver.profiles {
pekko-persistence-cassandra-profile {
basic.request.consistency = QUORUM
}
}
Delete all events
The Cleanup
Cleanup
tool can be used for deleting all events and/or snapshots given list of persistenceIds
without using persistent actors. It’s important that the actors with corresponding persistenceId
are not running at the same time as using the tool. See Database Cleanup for more details.