Choosing a Source Provider
A SourceProvider
SourceProvider
will provide the data to our projection. In Projections each element that’s processed is an Envelope
and each Envelope
contains an Event
. An Envelope
must include an Offset
, but it can also contain other information such as creation timestamp, a topic name, an entity tag, etc. There are several supported Source Provider’s available (or you can build your own), but in this example we will use the Apache Pekko Persistence EventSourced
Source Provider.
Add the following dependencies to your project:
- sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-eventsourced" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-projection-eventsourced_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-projection-eventsourced_${versions.ScalaBinary}:1.0.0" }
Add the following imports to ShoppingCartApp
:
- Scala
-
source
import org.apache.pekko import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal import pekko.persistence.query.Offset import pekko.projection.eventsourced.scaladsl.EventSourcedProvider import pekko.projection.scaladsl.SourceProvider
- Java
-
source
import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal; import org.apache.pekko.persistence.query.Offset; import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider; import org.apache.pekko.projection.javadsl.SourceProvider;
Create the SourceProvider
SourceProvider
. The Event Sourced Source Provider is using Apache Pekko Persistence internally (specifically the eventsByTag API). To initialize the Source Provider we need to set parameters to choose the Apache Pekko Persistence plugin (Cassandra) to use as well as the name of the tag used for events we’re interested in from the journal.
Setup the SourceProvider
in the Guardian Behavior
defined in ShoppingCartApp
:
- Scala
-
source
val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]] = EventSourcedProvider .eventsByTag[ShoppingCartEvents.Event]( system, readJournalPluginId = CassandraReadJournal.Identifier, tag = ShoppingCartTags.Single)
- Java
-
source
SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider = EventSourcedProvider.eventsByTag( system, CassandraReadJournal.Identifier(), ShoppingCartTags.SINGLE);
Finally, we must configure Apache Pekko Persistence by adding a configuration file guide-shopping-cart-app.conf
to the src/main/resources/
directory of the project:
sourcedatastax-java-driver {
# basic.contact-points = ["127.0.0.1:9042"]
# basic.load-balancing-policy.local-datacenter = "datacenter1"
advanced {
# reconnect to c* if down when app is started
reconnect-on-init = true
}
}
pekko {
loglevel = DEBUG
actor {
serialization-bindings {
"docs.guide.CborSerializable" = jackson-cbor
"jdocs.guide.CborSerializable" = jackson-cbor
}
}
persistence.journal {
plugin = "pekko.persistence.cassandra.journal"
auto-start-journals = ["pekko.persistence.cassandra.journal"]
}
persistence.snapshot-store.plugin = "pekko.persistence.cassandra.snapshot"
persistence {
cassandra {
journal {
# to create the schema
keyspace-autocreate = true
tables-autocreate = true
}
snapshot {
# to create the schema
keyspace-autocreate = true
tables-autocreate = true
}
query {
refresh-interval = 2s
}
events-by-tag {
# for lower latency
eventual-consistency-delay = 25ms
flush-interval = 25ms
pubsub-notification = on
}
}
}
}