foldAsync
Just like fold
but receives a function that results in a Future
CompletionStage
to the next value.
Signature
Source.foldAsync
Source.foldAsync
Flow.foldAsync
Flow.foldAsync
Description
Just like fold
but receives a function that results in a Future
CompletionStage
to the next value.
Note that the zero
value must be immutable, because otherwise the same mutable instance would be shared across different threads when running the stream more than once.
Example
foldAsync
is typically used to ‘fold up’ the incoming values into an aggregate asynchronously. For example, you might want to summarize the incoming values into a histogram:
- Scala
-
source
import org.apache.pekko import pekko.actor.ActorSystem import pekko.stream.scaladsl.Source import scala.concurrent.{ ExecutionContext, Future } case class Histogram(low: Long = 0, high: Long = 0) { def add(i: Int): Future[Histogram] = if (i < 100) Future { copy(low = low + 1) } else Future { copy(high = high + 1) } } Source(1 to 150).foldAsync(Histogram())((acc, n) => acc.add(n)).runForeach(println) // Prints: Histogram(99,51)
- Java
-
source
class Histogram { final long low; final long high; private Histogram(long low, long high) { this.low = low; this.high = high; } // Immutable start value public static Histogram INSTANCE = new Histogram(0L, 0L); public CompletionStage<Histogram> addAsync(Integer n) { if (n < 100) { return CompletableFuture.supplyAsync(() -> new Histogram(low + 1L, high)); } else { return CompletableFuture.supplyAsync(() -> new Histogram(low, high + 1L)); } } } // Folding over the numbers from 1 to 150: Source.range(1, 150) .foldAsync(Histogram.INSTANCE, Histogram::addAsync) .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system); // Prints: Histogram(99, 51)
Reactive Streams semantics
emits when upstream completes and the last Future
CompletionStage
is resolved
backpressures when downstream backpressures
completes when upstream completes and the last Future
CompletionStage
is resolved