fold

Start with current value zero and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.

Simple operators

Signature

Source.foldSource.fold Flow.foldFlow.fold

Description

Start with current value zero and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.

Warning

Note that the zero value must be immutable, because otherwise the same mutable instance would be shared across different threads when running the stream more than once.

Example

fold is typically used to ‘fold up’ the incoming values into an aggregate. For example, you might want to summarize the incoming values into a histogram:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl.Source

case class Histogram(low: Long = 0, high: Long = 0) {
  def add(i: Int): Histogram = if (i < 100) copy(low = low + 1) else copy(high = high + 1)
}
Source(1 to 150).fold(Histogram())((acc, n) => acc.add(n)).runForeach(println)

// Prints: Histogram(99,51)
Java
source
public CompletionStage<Histogram> addAsync(Integer n) { if (n < 100) { return CompletableFuture.supplyAsync(() -> new Histogram(low + 1L, high)); } else { return CompletableFuture.supplyAsync(() -> new Histogram(low, high + 1L)); } } // Folding over the numbers from 1 to 150: Source.range(1, 150) .fold(Histogram.INSTANCE, Histogram::add) .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system); // Prints: Histogram(99, 51)

Reactive Streams semantics

emits when upstream completes

backpressures when downstream backpressures

completes when upstream completes