Setup your application

Add the Apache Pekko Projections core library to a new project. This isn’t strictly required, because as we add other dependencies in the following steps it will transitively include core as a dependency, but it never hurts to be explicit.

sbt
libraryDependencies += "org.apache.pekko" %% "pekko-projection-core" % "1.0.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-projection-core_${scala.binary.version}</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-projection-core_${versions.ScalaBinary}:1.0.0"
}

Define the event type protocol that will represent each Envelope streamed from the Source Provider. Add ShoppingCartEvents to your project:

Scala
sourcepackage docs.guide

import java.time.Instant

object ShoppingCartEvents {

  /**
   * This interface defines all the events that the ShoppingCart supports.
   */
  sealed trait Event extends CborSerializable {
    def cartId: String
  }

  sealed trait ItemEvent extends Event {
    def itemId: String
  }

  final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends ItemEvent
  final case class ItemRemoved(cartId: String, itemId: String, oldQuantity: Int) extends ItemEvent
  final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int, oldQuantity: Int)
      extends ItemEvent
  final case class CheckedOut(cartId: String, eventTime: Instant) extends Event
}
Java
sourcepackage jdocs.guide;

import java.time.Instant;

public class ShoppingCartEvents {
  public interface Event extends CborSerializable {
    String getCartId();
  }

  public interface ItemEvent extends Event {
    String getItemId();
  }

  public static final class ItemAdded implements ItemEvent {
    public final String cartId;
    public final String itemId;
    public final int quantity;

    public ItemAdded(String cartId, String itemId, int quantity) {
      this.cartId = cartId;
      this.itemId = itemId;
      this.quantity = quantity;
    }

    public String getCartId() {
      return this.cartId;
    }

    public String getItemId() {
      return this.itemId;
    }
  }

  public static final class ItemRemoved implements ItemEvent {
    public final String cartId;
    public final String itemId;
    public final int oldQuantity;

    public ItemRemoved(String cartId, String itemId, int oldQuantity) {
      this.cartId = cartId;
      this.itemId = itemId;
      this.oldQuantity = oldQuantity;
    }

    public String getCartId() {
      return this.cartId;
    }

    public String getItemId() {
      return this.itemId;
    }
  }

  public static final class ItemQuantityAdjusted implements ItemEvent {
    public final String cartId;
    public final String itemId;
    public final int newQuantity;
    public final int oldQuantity;

    public ItemQuantityAdjusted(String cartId, String itemId, int newQuantity, int oldQuantity) {
      this.cartId = cartId;
      this.itemId = itemId;
      this.newQuantity = newQuantity;
      this.oldQuantity = oldQuantity;
    }

    public String getCartId() {
      return this.cartId;
    }

    public String getItemId() {
      return this.itemId;
    }
  }

  public static final class CheckedOut implements Event {
    public final String cartId;
    public final Instant eventTime;

    public CheckedOut(String cartId, Instant eventTime) {
      this.cartId = cartId;
      this.eventTime = eventTime;
    }

    public String getCartId() {
      return this.cartId;
    }
  }
}

To enable serialization and deserialization of events with Apache Pekko Persistence it’s necessary to define a base type for your event type hierarchy. In this guide we are using Jackson Serialization. Add the CborSerializable base type to your project:

Scala
sourcepackage docs.guide

/**
 * Marker trait for serialization with Jackson CBOR
 */
trait CborSerializable
Java
sourcepackage jdocs.guide;

/** Marker trait for serialization with Jackson CBOR */
public interface CborSerializable {}

Configure the CborSerializable type to use jackson-cbor configuration in your application.conf. We will add this configuration when Apache Pekko Persistence configuration is setup in the Choosing a SourceProvider section of the guide.

Scala
sourcepekko.actor.serialization-bindings {
  "docs.guide.CborSerializable" = jackson-cbor
}
Java
sourcepekko.actor.serialization-bindings {
  "jdocs.guide.CborSerializable" = jackson-cbor
}
Note

For Jackson serialization to work correctly in Java projects you must use the javac compiler parameter -parameters when building your project. In sbt you can add it your sbt project by adding it to the javacOptions Setting: javacOptions += "-parameters"maven you can add an argument to maven-compiler-plugin plugin under compilerArgs.

Define the persistence tags to be used in your project. Note that partitioned tags will be used later when running the projection in Apache Pekko Cluster. Add ShoppingCartTags to your project:

Scala
sourcepackage docs.guide

object ShoppingCartTags {
  val Single = "shopping-cart"
  val Tags = Vector("carts-0", "carts-1", "carts-2")
}
Java
sourcepackage jdocs.guide;

public class ShoppingCartTags {
  public static String SINGLE = "shopping-cart";
  public static String[] TAGS = {"carts-0", "carts-1", "carts-2"};
}

Create the ShoppingCartApp with an org.apache.pekko.actor.typed.ActorSystem (API: ActorSystemActorSystem) for Projections to use. Create an empty Guardian Actor (the root Actor of the ActorSystem). We will populate this Actor in the following steps of the guide. Note that we are using the docs.scaladsljdocs.scaladsl package. You may use any package, but we include this package in snippets throughout the guide.

Scala
sourcepackage docs.guide

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.projection.ProjectionBehavior
import pekko.projection.eventsourced.EventEnvelope
import com.typesafe.config.ConfigFactory

object ShoppingCartApp extends App {
  val config = ConfigFactory.load("guide-shopping-cart-app.conf")

  ActorSystem(
    Behaviors.setup[String] { context =>
      val system = context.system

      // ...

      Behaviors.empty
    },
    "ShoppingCartApp",
    config)
}
Java
sourcepackage jdocs.guide;

import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.projection.ProjectionBehavior;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class ShoppingCartApp {
  public static void main(String[] args) throws Exception {
    Config config = ConfigFactory.load("guide-shopping-cart-app.conf");

    ActorSystem.create(
        Behaviors.setup(
            context -> {
              ActorSystem<Void> system = context.getSystem();

              // ...

              return Behaviors.empty();
            }),
        "ShoppingCartApp",
        config);
  }
}