Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or average of incoming numbers, if aggregation should lead to a different type
conflateWithSeed can be used:
import org.apache.pekko.stream.scaladsl.Source import scala.concurrent.duration._ Source .cycle(() => List(1, 10, 100, 1000).iterator) .throttle(10, per = 1.second) // faster upstream .conflate((acc, el) => acc + el) // acc: Int, el: Int .throttle(1, per = 1.second) // slow downstream
Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflate((Integer acc, Integer el) -> acc + el) .throttle(1, Duration.ofSeconds(1)); // slow downstream
If downstream is slower the elements are conflated by summing them. This means that upstream can continue producing elements while downstream is applying backpressure. For example: downstream is backpressuring while 1, 10 and 100 arrives from upstream, then backpressure stops and the conflated 111 is emitted downstream.
See Rate transformation for more information and examples.
Reactive Streams semantics
emits when downstream stops backpressuring and there is a conflated element available
backpressures when the aggregate function cannot keep up with incoming elements
completes when upstream completes