Events from Apache Pekko Persistence
A typical source for Projections is events stored with EventSourcedBehavior
EventSourcedBehavior
in Apache Pekko Persistence. Events can be tagged and then consumed with the eventsByTag query.
Apache Pekko Projections has integration with eventsByTag
, which is described here.
Dependencies
To use the Event Sourced module of Apache Pekko Projections add the following dependency in your project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-eventsourced" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-eventsourced_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-eventsourced_${versions.ScalaBinary}:1.0.0" }
Apache Pekko Projections require Pekko 1.0.2 or later, see Pekko version.
Project Info: Apache Pekko Projections Eventsourced | |
---|---|
Artifact | org.apache.pekko
pekko-projection-eventsourced
1.0.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 3.3.3, 2.13.13, 2.12.19 |
JPMS module name | pekko.projection.eventsourced |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/apache/pekko-projection |
Transitive dependencies
The table below shows pekko-projection-eventsourced
’s direct dependencies and the second tab shows all libraries it depends on transitively.
SourceProvider for eventsByTag
A SourceProvider
SourceProvider
defines the source of the event envelopes that the Projection
will process. A SourceProvider
for the eventsByTag
query can be defined with the EventSourcedProvider
EventSourcedProvider
like this:
- Scala
-
source
import org.apache.pekko import pekko.projection.eventsourced.EventEnvelope import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal import pekko.persistence.query.Offset import pekko.projection.eventsourced.scaladsl.EventSourcedProvider import pekko.projection.scaladsl.SourceProvider val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsByTag[ShoppingCart.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = "carts-1")
- Java
-
source
import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal; import org.apache.pekko.persistence.query.Offset; import org.apache.pekko.projection.eventsourced.EventEnvelope; import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider; import org.apache.pekko.projection.javadsl.SourceProvider; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
This example is using the Cassandra plugin for Apache Pekko Persistence, but same code can be used for other Apache Pekko Persistence plugins by replacing the CassandraReadJournal.Identifier
. For example the JDBC plugin can be used. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior
.
This source is consuming all events from the ShoppingCart
EventSourcedBehavior
that are tagged with "cart-1"
.
The tags are assigned as described in Tagging Events in EventSourcedBehavior.
The EventEnvelope[ShoppingCart.Event]
EventEnvelope<ShoppingCart.Event>
is what the Projection
handler will process. It contains the Event
and additional meta data, such as the offset that will be stored by the Projection
. See EventEnvelope
EventEnvelope
for full details of what the envelope contains.
SourceProvider for eventsBySlices
A SourceProvider
SourceProvider
defines the source of the event envelopes that the Projection
will process. A SourceProvider
for the eventsBySlices
query can be defined with the EventSourcedProvider
EventSourcedProvider
like this:
- Scala
-
source
import org.apache.pekko import pekko.persistence.query.typed.EventEnvelope import pekko.persistence.query.Offset import pekko.projection.eventsourced.scaladsl.EventSourcedProvider import pekko.projection.scaladsl.SourceProvider // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "ShoppingCart" val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsBySlices[ShoppingCart.Event]( system, readJournalPluginId = R2dbcReadJournal.Identifier, entityType, minSlice, maxSlice)
- Java
-
source
import org.apache.pekko.japi.Pair; import org.apache.pekko.persistence.query.Offset; import org.apache.pekko.persistence.query.typed.EventEnvelope; import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider; import org.apache.pekko.projection.javadsl.SourceProvider; // Slit the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), entityType, minSlice, maxSlice);
This example is using the R2DBC plugin for Apache Pekko Persistence. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior
.
This source is consuming all events from the ShoppingCart
EventSourcedBehavior
for the given slice range. In a production application, you would need to start as many instances as the number of slice ranges. That way you consume the events from all entities.
The EventEnvelope[ShoppingCart.Event]
EventEnvelope<ShoppingCart.Event>
is what the Projection
handler will process. It contains the Event
and additional meta data, such as the offset that will be stored by the Projection
. See EventEnvelope
EventEnvelope
for full details of what the envelope contains.