conflateWithSeed
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.
Signature
Source.conflateWithSeed
Source.conflateWithSeed
Flow.conflateWithSeed
Flow.conflateWithSeed
Description
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure. When backpressure starts or there is no backpressure element is passed into a seed
function to transform it to the summary type.
Example
- Scala
-
source
import org.apache.pekko.stream.scaladsl.Source import scala.concurrent.duration._ case class Summed(i: Int) { def sum(other: Summed) = Summed(this.i + other.i) } Source .cycle(() => List(1, 10, 100, 1000).iterator) .throttle(10, per = 1.second) // faster upstream .conflateWithSeed(el => Summed(el))((acc, el) => acc.sum(Summed(el))) // (Summed, Int) => Summed .throttle(1, per = 1.second) // slow downstream
- Java
-
source
class Summed { private final Integer el; public Summed(Integer el) { this.el = el; } public Summed sum(Summed other) { return new Summed(this.el + other.el); } } Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el))) .throttle(1, Duration.ofSeconds(1)); // slow downstream
If downstream is slower, the “seed” function is called which is able to change the type of the to be conflated elements if needed (it can also be an identity function, in which case this conflateWithSeed
is equivalent to a plain conflate
). Next, the conflating function is applied while there is back-pressure from the downstream, such that the upstream can produce elements at an rate independent of the downstream.
You may want to use this operation for example to apply an average operation on the upstream elements, while the downstream backpressures. This allows us to keep processing upstream elements, and give an average number to the downstream once it is ready to process the next one.
See Rate transformation for more information and examples.
Reactive Streams semantics
emits when downstream stops backpressuring and there is a conflated element available
backpressures when the aggregate or seed functions cannot keep up with incoming elements
completes when upstream completes