org.apache.pekko.persistence.cassandra.query.scaladsl
CassandraReadJournal
Companion object CassandraReadJournal
class CassandraReadJournal extends ReadJournal with PersistenceIdsQuery with CurrentPersistenceIdsQuery with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with EventsByTagQuery with CurrentEventsByTagQuery
Scala API org.apache.pekko.persistence.query.scaladsl.ReadJournal implementation for Cassandra.
It is retrieved with:
val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)Corresponding Java API is in pekko.persistence.cassandra.query.javadsl.CassandraReadJournal.
Configuration settings can be defined in the configuration section with the
absolute path corresponding to the identifier, which is "pekko.persistence.cassandra.query"
for the default CassandraReadJournal#Identifier. See reference.conf.
- Alphabetic
- By Inheritance
- CassandraReadJournal
- CurrentEventsByTagQuery
- EventsByTagQuery
- CurrentEventsByPersistenceIdQuery
- EventsByPersistenceIdQuery
- CurrentPersistenceIdsQuery
- PersistenceIdsQuery
- ReadJournal
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new CassandraReadJournal(system: ExtendedActorSystem, cfg: Config, cfgPath: String)
- new CassandraReadJournal(system: ExtendedActorSystem, sharedConfig: Config, sharedConfigPath: String, viaNormalConstructor: Boolean)
- Attributes
- protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]
Same type of query as
eventsByPersistenceIdbut the event stream is completed immediately when it reaches the end of the "result set".Same type of query as
eventsByPersistenceIdbut the event stream is completed immediately when it reaches the end of the "result set". Events that are stored after the query is completed are not included in the event stream.- Definition Classes
- CassandraReadJournal → CurrentEventsByPersistenceIdQuery
- def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
Same type of query as
eventsByTagbut the event stream is completed immediately when it reaches the end of the "result set".Same type of query as
eventsByTagbut the event stream is completed immediately when it reaches the end of the "result set". Events that are stored after the query is completed are not included in the event stream.Use
NoOffsetwhen you want all events from the beginning of time. To acquire an offset from a long unix timestamp to use with this query, you can use timeBasedUUIDFrom.- Definition Classes
- CassandraReadJournal → CurrentEventsByTagQuery
- def currentPersistenceIds(): Source[String, NotUsed]
Same type of query as
persistenceIdsbut the event stream is completed immediately when it reaches the end of the "result set".Same type of query as
persistenceIdsbut the event stream is completed immediately when it reaches the end of the "result set". Events that are stored after the query is completed are not included in the event stream.- Definition Classes
- CassandraReadJournal → CurrentPersistenceIdsQuery
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]
eventsByPersistenceIdis used to retrieve a stream of events for a particular persistenceId.eventsByPersistenceIdis used to retrieve a stream of events for a particular persistenceId.The
EventEnvelopecontains the event and providespersistenceIdandsequenceNrfor each event. ThesequenceNris the sequence number for the persistent actor with thepersistenceIdthat persisted the event. ThepersistenceId+sequenceNris an unique identifier for the event.fromSequenceNrandtoSequenceNrcan be specified to limit the set of returned events. ThefromSequenceNrandtoSequenceNrare inclusive.The
EventEnvelopealso provides anoffset, which is the same kind of offset as is used in theeventsByTagquery. TheOffsettype isorg.apache.pekko.persistence.query.TimeBasedUUID.The returned event stream is ordered by
sequenceNr.Deleted events are also deleted from the event stream.
The stream is not completed when it reaches the end of the currently stored events, but it continues to push new events when new events are persisted. Corresponding query that is completed when it reaches the end of the currently stored events is provided by
currentEventsByPersistenceId.- Definition Classes
- CassandraReadJournal → EventsByPersistenceIdQuery
- def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
eventsByTagis used for retrieving events that were marked with a given tag, e.g.eventsByTagis used for retrieving events that were marked with a given tag, e.g. all events of an Aggregate Root type.To tag events you create an
org.apache.pekko.persistence.journal.EventAdapterthat wraps the events in aorg.apache.pekko.persistence.journal.Taggedwith the giventags. The tags must be defined in thetagssection of thepekko.persistence.cassandraconfiguration.You can use NoOffset to retrieve all events with a given tag or retrieve a subset of all events by specifying a
TimeBasedUUIDoffset.The offset of each event is provided in the streamed envelopes returned, which makes it possible to resume the stream at a later point from a given offset. The
offsetparameter is exclusive, i.e. the event corresponding to the givenoffsetparameter is not included in the stream. TheOffsettype isorg.apache.pekko.persistence.query.TimeBasedUUID.For querying events that happened after a long unix timestamp you can use timeBasedUUIDFrom to create the offset to use with this method.
In addition to the
offsetthe envelope also providespersistenceIdandsequenceNrfor each event. ThesequenceNris the sequence number for the persistent actor with thepersistenceIdthat persisted the event. ThepersistenceId+sequenceNris an unique identifier for the event.The returned event stream is ordered by the offset (timestamp), which corresponds to the same order as the write journal stored the events, with inaccuracy due to clock skew between different nodes. The same stream elements (in same order) are returned for multiple executions of the query on a best effort basis. The query is using a batched writes to a separate table so is eventually consistent. This means that different queries may see different events for the latest events, but eventually the result will be ordered by timestamp (Cassandra timeuuid column).
However a strong guarantee is provided that events for a given persistenceId will be delivered in order, the eventual consistency is only for ordering of events from different persistenceIds.
The stream is not completed when it reaches the end of the currently stored events, but it continues to push new events when new events are persisted. Corresponding query that is completed when it reaches the end of the currently stored events is provided by currentEventsByTag.
The stream is completed with failure if there is a failure in executing the query in the backend journal.
- Definition Classes
- CassandraReadJournal → EventsByTagQuery
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- val firstOffset: UUID
Use this as the UUID offset in
eventsByTagqueries when you want all events from the beginning of time. - final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def initialize(): Future[Done]
Initialize connection to Cassandra and prepared statements.
Initialize connection to Cassandra and prepared statements. It is not required to do this and it will happen lazily otherwise. It is also not required to wait until this Future is complete to start using the read journal.
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def offsetUuid(timestamp: Long): UUID
Create a time based UUID that can be used as offset in
eventsByTagqueries.Create a time based UUID that can be used as offset in
eventsByTagqueries. Thetimestampis a unix timestamp (as returned bySystem#currentTimeMillis). - def persistenceIds(): Source[String, NotUsed]
allPersistenceIdsis used to retrieve a stream ofpersistenceIds.allPersistenceIdsis used to retrieve a stream ofpersistenceIds.The stream emits
persistenceIdstrings.The stream guarantees that a
persistenceIdis only emitted once and there are no duplicates. Order is not defined. Multiple executions of the same stream (even bounded) may emit different sequence ofpersistenceIds.The stream is not completed when it reaches the end of the currently known
persistenceIds, but it continues to push newpersistenceIds when new events are persisted. Corresponding query that is completed when it reaches the end of the currently knownpersistenceIds is provided bycurrentPersistenceIds.- Definition Classes
- CassandraReadJournal → PersistenceIdsQuery
- val session: CassandraSession
Data Access Object for arbitrary queries or updates.
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def timeBasedUUIDFrom(timestamp: Long): Offset
Create a time based UUID that can be used as offset in
eventsByTagqueries.Create a time based UUID that can be used as offset in
eventsByTagqueries. Thetimestampis a unix timestamp (as returned bySystem#currentTimeMillis). - def timestampFrom(offset: TimeBasedUUID): Long
Convert a
TimeBasedUUIDto a unix timestamp (as returned bySystem#currentTimeMillis). - def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()