scanAsync
Just like scan but receives a function that results in a Future CompletionStage to the next value.
Signature
Source.scanAsyncSource.scanAsync Flow.scanAsyncFlow.scanAsync
Description
Just like scan but receives a function that results in a Future CompletionStage to the next value.
Note that the zero value must be immutable, because otherwise the same mutable instance would be shared across different threads when running the stream more than once.
Example
Below example demonstrates how scanAsync is similar to fold, but it keeps value from every iteration.
- Scala
-
source
def asyncFunction(acc: Int, next: Int): Future[Int] = Future { acc + next } val source = Source(1 to 5) source.scanAsync(0)((acc, x) => asyncFunction(acc, x)).runForeach(println) // 0 (= 0) // 1 (= 0 + 1) // 3 (= 0 + 1 + 2) // 6 (= 0 + 1 + 2 + 3) // 10 (= 0 + 1 + 2 + 3 + 4) // 15 (= 0 + 1 + 2 + 3 + 4 + 5) - Java
-
source
CompletionStage<Integer> asyncFunction(int acc, int next) { return CompletableFuture.supplyAsync(() -> acc + next); } Source<Integer, NotUsed> source = Source.range(1, 5); source.scanAsync(0, (acc, x) -> asyncFunction(acc, x)).runForeach(System.out::println, system); // 0 (= 0) // 1 (= 0 + 1) // 3 (= 0 + 1 + 2) // 6 (= 0 + 1 + 2 + 3) // 10 (= 0 + 1 + 2 + 3 + 4) // 15 (= 0 + 1 + 2 + 3 + 4 + 5)
In an actual application the future would probably involve some external API that returns a Future CompletionStage rather than an immediately completed value.
Reactive Streams semantics
emits when the Future CompletionStage resulting from the function scanning the element resolves to the next value
backpressures when downstream backpressures
completes when upstream completes and the last Future CompletionStage is resolved