Processing with Actor

A good alternative for advanced state management is to implement the handler as an actor.

The following example is using the CassandraProjection but the handler and actor would be the same if used any other offset storage.

An actor Behavior for the word count example that was introduced in the section about Stateful handler:

Scala
sourceimport org.apache.pekko
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.actor.typed.SupervisorStrategy
import pekko.actor.typed.scaladsl.ActorContext
import pekko.actor.typed.scaladsl.Behaviors

object WordCountProcessor {
  trait Command
  final case class Handle(envelope: WordEnvelope, replyTo: ActorRef[Try[Done]]) extends Command
  private final case class InitialState(state: Map[Word, Count]) extends Command
  private final case class SaveCompleted(word: Word, saveResult: Try[Done]) extends Command

  def apply(projectionId: ProjectionId, repository: WordCountRepository): Behavior[Command] =
    Behaviors
      .supervise[Command] {
        Behaviors.setup { context =>
          new WordCountProcessor(context, projectionId, repository).init()
        }
      }
      .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1))
}

class WordCountProcessor(
    context: ActorContext[WordCountProcessor.Command],
    projectionId: ProjectionId,
    repository: WordCountRepository) {
  import WordCountProcessor._

  // loading initial state from db
  def init(): Behavior[Command] = {
    Behaviors.withStash(10) { buffer =>
      context.pipeToSelf(repository.loadAll(projectionId.id)) {
        case Success(value) => InitialState(value)
        case Failure(exc)   => throw exc
      }

      Behaviors.receiveMessage {
        case InitialState(state) =>
          context.log.debug("Initial state [{}]", state)
          buffer.unstashAll(idle(state))
        case other =>
          context.log.debug("Stashed [{}]", other)
          buffer.stash(other)
          Behaviors.same
      }
    }
  }

  // waiting for next envelope
  private def idle(state: Map[Word, Count]): Behavior[Command] =
    Behaviors.receiveMessagePartial {
      case Handle(envelope, replyTo) =>
        val word = envelope.word
        context.pipeToSelf(repository.save(projectionId.id, word, state.getOrElse(word, 0) + 1)) { saveResult =>
          SaveCompleted(word, saveResult)
        }
        saving(state, replyTo) // will reply from SaveCompleted
      case _: InitialState =>
        Behaviors.unhandled
    }

  // saving the new count for a word in db
  private def saving(state: Map[Word, Count], replyTo: ActorRef[Try[Done]]): Behavior[Command] =
    Behaviors.receiveMessagePartial {
      case SaveCompleted(word, saveResult) =>
        replyTo ! saveResult
        saveResult match {
          case Success(_)   => idle(state.updated(word, state.getOrElse(word, 0) + 1))
          case Failure(exc) => throw exc // restart, reload state from db
        }
    }
}
Java
sourceimport org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;

public class WordCountProcessor {
  public interface Command {}

  public static class Handle implements Command {
    public final WordEnvelope envelope;
    public final ActorRef<Result> replyTo;

    public Handle(WordEnvelope envelope, ActorRef<Result> replyTo) {
      this.envelope = envelope;
      this.replyTo = replyTo;
    }
  }

  public static class Result {
    public final Optional<Throwable> error;

    public Result(Optional<Throwable> error) {
      this.error = error;
    }
  }

  private static class InitialState implements Command {
    final Map<String, Integer> state;

    private InitialState(Map<String, Integer> state) {
      this.state = state;
    }
  }

  private static class SaveCompleted implements Command {
    final String word;
    final Optional<Throwable> error;
    final ActorRef<Result> replyTo;

    private SaveCompleted(String word, Optional<Throwable> error, ActorRef<Result> replyTo) {
      this.word = word;
      this.error = error;
      this.replyTo = replyTo;
    }
  }

  public static Behavior<Command> create(
      ProjectionId projectionId, WordCountRepository repository) {
    return Behaviors.supervise(
            Behaviors.setup(
                (ActorContext<Command> context) ->
                    new WordCountProcessor(projectionId, repository).init(context)))
        .onFailure(
            SupervisorStrategy.restartWithBackoff(
                Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1));
  }

  private final ProjectionId projectionId;
  private final WordCountRepository repository;

  private WordCountProcessor(ProjectionId projectionId, WordCountRepository repository) {
    this.projectionId = projectionId;
    this.repository = repository;
  }

  Behavior<Command> init(ActorContext<Command> context) {
    return Behaviors.withStash(10, buffer -> new Initializing(context, buffer));
  }

  private class Initializing extends AbstractBehavior<Command> {
    private final StashBuffer<Command> buffer;

    private Initializing(ActorContext<Command> context, StashBuffer<Command> buffer) {
      super(context);
      this.buffer = buffer;

      getContext()
          .pipeToSelf(
              repository.loadAll(projectionId.id()),
              (value, exc) -> {
                if (value != null) return new InitialState(value);
                else throw new RuntimeException("Load failed.", exc);
              });
    }

    @Override
    public Receive<Command> createReceive() {
      return newReceiveBuilder()
          .onMessage(InitialState.class, this::onInitalState)
          .onAnyMessage(this::onOther)
          .build();
    }

    private Behavior<Command> onInitalState(InitialState initialState) {
      getContext().getLog().debug("Initial state [{}]", initialState.state);
      return buffer.unstashAll(new Active(getContext(), initialState.state));
    }

    private Behavior<Command> onOther(Command command) {
      getContext().getLog().debug("Stashed [{}]", command);
      buffer.stash(command);
      return this;
    }
  }

  private class Active extends AbstractBehavior<Command> {
    private final Map<String, Integer> state;

    public Active(ActorContext<Command> context, Map<String, Integer> state) {
      super(context);
      this.state = state;
    }

    @Override
    public Receive<Command> createReceive() {
      return newReceiveBuilder()
          .onMessage(Handle.class, this::onHandle)
          .onMessage(SaveCompleted.class, this::onSaveCompleted)
          .build();
    }

    private Behavior<Command> onHandle(Handle command) {
      String word = command.envelope.word;
      int newCount = state.getOrDefault(word, 0) + 1;
      getContext()
          .pipeToSelf(
              repository.save(projectionId.id(), word, newCount),
              (done, exc) ->
                  // will reply from SaveCompleted
                  new SaveCompleted(word, Optional.ofNullable(exc), command.replyTo));
      return this;
    }

    private Behavior<Command> onSaveCompleted(SaveCompleted completed) {
      completed.replyTo.tell(new Result(completed.error));
      if (completed.error.isPresent()) {
        // restart, reload state from db
        throw new RuntimeException("Save failed.", completed.error.get());
      } else {
        String word = completed.word;
        int newCount = state.getOrDefault(word, 0) + 1;
        state.put(word, newCount);
      }
      return this;
    }
  }
}

The handler can be definined as:

Scala
sourceimport org.apache.pekko
import pekko.projection.scaladsl.ActorHandler

class WordCountActorHandler(behavior: Behavior[WordCountProcessor.Command])(implicit system: ActorSystem[_])
    extends ActorHandler[WordEnvelope, WordCountProcessor.Command](behavior) {
  import pekko.actor.typed.scaladsl.AskPattern._
  import system.executionContext

  private implicit val askTimeout: Timeout = 5.seconds

  override def process(actor: ActorRef[WordCountProcessor.Command], envelope: WordEnvelope): Future[Done] = {
    actor.ask[Try[Done]](replyTo => WordCountProcessor.Handle(envelope, replyTo)).map {
      case Success(_)   => Done
      case Failure(exc) => throw exc
    }
  }
}
Java
sourceclass WordCountActorHandler extends ActorHandler<WordEnvelope, WordCountProcessor.Command> {
  private final ActorSystem<?> system;
  private final Duration askTimeout = Duration.ofSeconds(5);

  WordCountActorHandler(Behavior<WordCountProcessor.Command> behavior, ActorSystem<?> system) {
    super(behavior);
    this.system = system;
  }

  @Override
  public CompletionStage<Done> process(
      ActorRef<WordCountProcessor.Command> actor, WordEnvelope envelope) {
    CompletionStage<WordCountProcessor.Result> result =
        AskPattern.ask(
            actor,
            (ActorRef<WordCountProcessor.Result> replyTo) ->
                new WordCountProcessor.Handle(envelope, replyTo),
            askTimeout,
            system.scheduler());

    return result.thenCompose(
        r -> {
          if (r.error.isPresent()) {
            CompletableFuture<Done> err = new CompletableFuture<>();
            err.completeExceptionally(r.error.get());
            return err;
          } else {
            return CompletableFuture.completedFuture(Done.getInstance());
          }
        });
  }
}

and the Projection:

Scala
sourceval projection =
  CassandraProjection
    .atLeastOnce[Long, WordEnvelope](
      projectionId,
      sourceProvider = new WordSource,
      handler = () => new WordCountActorHandler(WordCountProcessor(projectionId, repository)))
Java
sourceProjection<WordEnvelope> projection =
    CassandraProjection.atLeastOnce(
        projectionId,
        new WordSource(),
        () ->
            new WordCountActorHandler(
                WordCountProcessor.create(projectionId, repository), system));

The Behavior given to the ActorHandler is spawned automatically by the Projection and each envelope is sent to the actor with ask. The actor is supposed to send a response message to the replyTo when it has completed the processing of the envelope. The TryOptional<Throwable> error indicates if the processing was successful or failed.

The lifecycle of the actor is managed by the Projection. The actor is automatically stopped when the Projection is stopped.

Another implementation that is loading the current count for a word on demand, and thereafter caches it in the in-memory state:

Scala
sourceobject WordCountProcessor {
  trait Command
  final case class Handle(envelope: WordEnvelope, replyTo: ActorRef[Try[Done]]) extends Command
  private final case class LoadCompleted(word: Word, loadResult: Try[Count]) extends Command
  private final case class SaveCompleted(word: Word, saveResult: Try[Done]) extends Command

  def apply(projectionId: ProjectionId, repository: WordCountRepository): Behavior[Command] =
    Behaviors
      .supervise[Command] {
        Behaviors.setup[Command] { context =>
          new WordCountProcessor(context, projectionId, repository).idle(Map.empty)
        }
      }
      .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1))
}

class WordCountProcessor(
    context: ActorContext[WordCountProcessor.Command],
    projectionId: ProjectionId,
    repository: WordCountRepository) {
  import WordCountProcessor._

  // waiting for next envelope
  private def idle(state: Map[Word, Count]): Behavior[Command] =
    Behaviors.receiveMessagePartial {
      case Handle(envelope, replyTo) =>
        val word = envelope.word
        state.get(word) match {
          case None =>
            load(word)
            loading(state, replyTo) // will continue from LoadCompleted
          case Some(count) =>
            save(word, count + 1)
            saving(state, replyTo) // will reply from SaveCompleted
        }
    }

  private def load(word: String): Unit = {
    context.pipeToSelf(repository.load(projectionId.id, word)) { loadResult =>
      LoadCompleted(word, loadResult)
    }
  }

  // loading the count for a word from db
  private def loading(state: Map[Word, Count], replyTo: ActorRef[Try[Done]]): Behavior[Command] =
    Behaviors.receiveMessagePartial {
      case LoadCompleted(word, loadResult) =>
        loadResult match {
          case Success(count) =>
            save(word, count + 1)
            saving(state, replyTo) // will reply from SaveCompleted
          case Failure(exc) =>
            replyTo ! Failure(exc)
            idle(state)
        }
    }

  private def save(word: String, count: Count): Unit = {
    context.pipeToSelf(repository.save(projectionId.id, word, count)) { saveResult =>
      SaveCompleted(word, saveResult)
    }
  }

  // saving the new count for a word in db
  private def saving(state: Map[Word, Count], replyTo: ActorRef[Try[Done]]): Behavior[Command] =
    Behaviors.receiveMessagePartial {
      case SaveCompleted(word, saveResult) =>
        replyTo ! saveResult
        saveResult match {
          case Success(_) =>
            idle(state.updated(word, state.getOrElse(word, 0) + 1))
          case Failure(_) =>
            // remove the word from the state if the save failed, because it could have been a timeout
            // so that it was actually saved, best to reload
            idle(state - word)
        }
    }
}
Java
source  public class WordCountProcessor extends AbstractBehavior<WordCountProcessor.Command> {
    public interface Command {}

    public static class Handle implements Command {
      public final WordEnvelope envelope;
      public final ActorRef<Result> replyTo;

      public Handle(WordEnvelope envelope, ActorRef<Result> replyTo) {
        this.envelope = envelope;
        this.replyTo = replyTo;
      }
    }

    public static class Result {
      public final Optional<Throwable> error;

      public Result(Optional<Throwable> error) {
        this.error = error;
      }
    }

    private static class LoadCompleted implements Command {
      final String word;
      final Optional<Throwable> error;
      final ActorRef<Result> replyTo;

      private LoadCompleted(String word, Optional<Throwable> error, ActorRef<Result> replyTo) {
        this.word = word;
        this.error = error;
        this.replyTo = replyTo;
      }
    }

    private static class SaveCompleted implements Command {
      final String word;
      final Optional<Throwable> error;
      final ActorRef<Result> replyTo;

      private SaveCompleted(String word, Optional<Throwable> error, ActorRef<Result> replyTo) {
        this.word = word;
        this.error = error;
        this.replyTo = replyTo;
      }
    }

    public static Behavior<Command> create(
        ProjectionId projectionId, WordCountRepository repository) {
      return Behaviors.supervise(
              Behaviors.setup(
                  (ActorContext<Command> context) ->
                      new WordCountProcessor(context, projectionId, repository)))
          .onFailure(
              SupervisorStrategy.restartWithBackoff(
                  Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1));
    }

    private final ProjectionId projectionId;
    private final WordCountRepository repository;
    private final Map<String, Integer> state = new HashMap<>();

    private WordCountProcessor(
        ActorContext<Command> context,
        ProjectionId projectionId,
        WordCountRepository repository) {
      super(context);
      this.projectionId = projectionId;
      this.repository = repository;
    }

    @Override
    public Receive<Command> createReceive() {
      return newReceiveBuilder()
          .onMessage(Handle.class, this::onHandle)
          .onMessage(LoadCompleted.class, this::onLoadCompleted)
          .onMessage(SaveCompleted.class, this::onSaveCompleted)
          .build();
    }

    private Behavior<Command> onHandle(Handle command) {
      String word = command.envelope.word;
      if (state.containsKey(word)) {
        int newCount = state.get(word) + 1;
        getContext()
            .pipeToSelf(
                repository.save(projectionId.id(), word, newCount),
                (done, exc) ->
                    // will reply from SaveCompleted
                    new SaveCompleted(word, Optional.ofNullable(exc), command.replyTo));
      } else {
        getContext()
            .pipeToSelf(
                repository.load(projectionId.id(), word),
                (loadResult, exc) ->
                    // will reply from LoadCompleted
                    new LoadCompleted(word, Optional.ofNullable(exc), command.replyTo));
      }
      return this;
    }

    private Behavior<Command> onLoadCompleted(LoadCompleted completed) {
      if (completed.error.isPresent()) {
        completed.replyTo.tell(new Result(completed.error));
      } else {
        String word = completed.word;
        int newCount = state.getOrDefault(word, 0) + 1;
        getContext()
            .pipeToSelf(
                repository.save(projectionId.id(), word, newCount),
                (done, exc) ->
                    // will reply from SaveCompleted
                    new SaveCompleted(word, Optional.ofNullable(exc), completed.replyTo));
      }
      return this;
    }

    private Behavior<Command> onSaveCompleted(SaveCompleted completed) {
      completed.replyTo.tell(new Result(completed.error));
      if (completed.error.isPresent()) {
        // remove the word from the state if the save failed, because it could have been a timeout
        // so that it was actually saved, best to reload
        state.remove(completed.word);
      } else {
        String word = completed.word;
        int newCount = state.getOrDefault(word, 0) + 1;
        state.put(word, newCount);
      }
      return this;
    }
  }
}