scanAsync
Just like scan
but receives a function that results in a Future
to the next value.
Signature¶
Source.scanAsync
Flow.scanAsync
Description¶
Just like scan
but receives a function that results in a Future
to the next value.
Note that the zero
value must be immutable, because otherwise the same mutable instance would be shared across different threads when running the stream more than once.
Example¶
Below example demonstrates how scanAsync
is similar to fold
, but it keeps value from every iteration.
sourcedef asyncFunction(acc: Int, next: Int): Future[Int] = Future {
acc + next
}
val source = Source(1 to 5)
source.scanAsync(0)((acc, x) => asyncFunction(acc, x)).runForeach(println)
// 0 (= 0)
// 1 (= 0 + 1)
// 3 (= 0 + 1 + 2)
// 6 (= 0 + 1 + 2 + 3)
// 10 (= 0 + 1 + 2 + 3 + 4)
// 15 (= 0 + 1 + 2 + 3 + 4 + 5)
sourceCompletionStage<Integer> asyncFunction(int acc, int next) {
return CompletableFuture.supplyAsync(() -> acc + next);
}
Source<Integer, NotUsed> source = Source.range(1, 5);
source.scanAsync(0, (acc, x) -> asyncFunction(acc, x)).runForeach(System.out::println, system);
// 0 (= 0)
// 1 (= 0 + 1)
// 3 (= 0 + 1 + 2)
// 6 (= 0 + 1 + 2 + 3)
// 10 (= 0 + 1 + 2 + 3 + 4)
// 15 (= 0 + 1 + 2 + 3 + 4 + 5)
In an actual application the future would probably involve some external API that returns a Future
rather than an immediately completed value.
Reactive Streams semantics¶
emits when the Future
resulting from the function scanning the element resolves to the next value
backpressures when downstream backpressures
completes when upstream completes and the last Future
is resolved