fold
Start with current value zero
and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.
Signature¶
Description¶
Start with current value zero
and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.
Warning
Note that the zero
value must be immutable, because otherwise the same mutable instance would be shared across different threads when running the stream more than once.
Example¶
fold
is typically used to ‘fold up’ the incoming values into an aggregate. For example, you might want to summarize the incoming values into a histogram:
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl.Source
case class Histogram(low: Long = 0, high: Long = 0) {
def add(i: Int): Histogram = if (i < 100) copy(low = low + 1) else copy(high = high + 1)
}
Source(1 to 150).fold(Histogram())((acc, n) => acc.add(n)).runForeach(println)
// Prints: Histogram(99,51)
source
public CompletionStage<Histogram> addAsync(Integer n) {
if (n < 100) {
return CompletableFuture.supplyAsync(() -> new Histogram(low + 1L, high));
} else {
return CompletableFuture.supplyAsync(() -> new Histogram(low, high + 1L));
}
}
// Folding over the numbers from 1 to 150:
Source.range(1, 150)
.fold(Histogram.INSTANCE, Histogram::add)
.runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system);
// Prints: Histogram(99, 51)
Reactive Streams semantics¶
emits when upstream completes
backpressures when downstream backpressures
completes when upstream completes
1.1.3