Choosing a Source Provider

A SourceProviderSourceProvider will provide the data to our projection. In Projections each element that’s processed is an Envelope and each Envelope contains an Event. An Envelope must include an Offset, but it can also contain other information such as creation timestamp, a topic name, an entity tag, etc. There are several supported Source Provider’s available (or you can build your own), but in this example we will use the Apache Pekko Persistence EventSourced Source Provider.

Add the following dependencies to your project:

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-eventsourced" % "1.0.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-eventsourced_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-eventsourced_${versions.ScalaBinary}:1.0.0"
}

Add the following imports to ShoppingCartApp:

Scala
sourceimport org.apache.pekko
import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
import pekko.persistence.query.Offset
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import pekko.projection.scaladsl.SourceProvider
Java
sourceimport org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
import org.apache.pekko.projection.javadsl.SourceProvider;

Create the SourceProviderSourceProvider. The Event Sourced Source Provider is using Apache Pekko Persistence internally (specifically the eventsByTag API). To initialize the Source Provider we need to set parameters to choose the Apache Pekko Persistence plugin (Cassandra) to use as well as the name of the tag used for events we’re interested in from the journal.

Setup the SourceProvider in the Guardian Behavior defined in ShoppingCartApp:

Scala
sourceval sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]] =
  EventSourcedProvider
    .eventsByTag[ShoppingCartEvents.Event](
      system,
      readJournalPluginId = CassandraReadJournal.Identifier,
      tag = ShoppingCartTags.Single)
Java
sourceSourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
    EventSourcedProvider.eventsByTag(
        system, CassandraReadJournal.Identifier(), ShoppingCartTags.SINGLE);

Finally, we must configure Apache Pekko Persistence by adding a configuration file guide-shopping-cart-app.conf to the src/main/resources/ directory of the project:

sourcedatastax-java-driver {
  # basic.contact-points = ["127.0.0.1:9042"]
  # basic.load-balancing-policy.local-datacenter = "datacenter1"
  advanced {
    # reconnect to c* if down when app is started
    reconnect-on-init = true
  }
}


pekko {
  loglevel = DEBUG
  actor {
    serialization-bindings {
      "docs.guide.CborSerializable" = jackson-cbor
      "jdocs.guide.CborSerializable" = jackson-cbor
    }
  }

  persistence.journal {
    plugin = "pekko.persistence.cassandra.journal"
    auto-start-journals = ["pekko.persistence.cassandra.journal"]
  }
  persistence.snapshot-store.plugin = "pekko.persistence.cassandra.snapshot"

  persistence {
    cassandra {
      journal {
        # to create the schema
        keyspace-autocreate = true
        tables-autocreate = true
      }

      snapshot {
        # to create the schema
        keyspace-autocreate = true
        tables-autocreate = true
      }

      query {
        refresh-interval = 2s
      }

      events-by-tag {
        # for lower latency
        eventual-consistency-delay = 25ms
        flush-interval = 25ms
        pubsub-notification = on
      }
    }
  }
}