foldAsync

Just like fold but receives a function that results in a Future CompletionStage to the next value.

Simple operators

Signature

Source.foldAsyncSource.foldAsync Flow.foldAsyncFlow.foldAsync

Description

Just like fold but receives a function that results in a Future CompletionStage to the next value.

Warning

Note that the zero value must be immutable, because otherwise the same mutable instance would be shared across different threads when running the stream more than once.

Example

foldAsync is typically used to ‘fold up’ the incoming values into an aggregate asynchronously. For example, you might want to summarize the incoming values into a histogram:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl.Source

import scala.concurrent.{ ExecutionContext, Future }
case class Histogram(low: Long = 0, high: Long = 0) {
  def add(i: Int): Future[Histogram] =
    if (i < 100) Future { copy(low = low + 1) }
    else Future { copy(high = high + 1) }
}

Source(1 to 150).foldAsync(Histogram())((acc, n) => acc.add(n)).runForeach(println)

// Prints: Histogram(99,51)
Java
sourceclass Histogram {
  final long low;
  final long high;

  private Histogram(long low, long high) {
    this.low = low;
    this.high = high;
  }

  // Immutable start value
  public static Histogram INSTANCE = new Histogram(0L, 0L);

  public CompletionStage<Histogram> addAsync(Integer n) {
    if (n < 100) {
      return CompletableFuture.supplyAsync(() -> new Histogram(low + 1L, high));
    } else {
      return CompletableFuture.supplyAsync(() -> new Histogram(low, high + 1L));
    }
  }
}

  // Folding over the numbers from 1 to 150:
  Source.range(1, 150)
      .foldAsync(Histogram.INSTANCE, Histogram::addAsync)
      .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system);

  // Prints: Histogram(99, 51)

Reactive Streams semantics

emits when upstream completes and the last Future CompletionStage is resolved

backpressures when downstream backpressures

completes when upstream completes and the last Future CompletionStage is resolved