conflateWithSeed

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.

Backpressure aware operators

Signature

Source.conflateWithSeedSource.conflateWithSeed Flow.conflateWithSeedFlow.conflateWithSeed

Description

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure. When backpressure starts or there is no backpressure element is passed into a seed function to transform it to the summary type.

Example

Scala
sourceimport org.apache.pekko.stream.scaladsl.Source

import scala.concurrent.duration._

case class Summed(i: Int) {
  def sum(other: Summed) = Summed(this.i + other.i)
}

Source
  .cycle(() => List(1, 10, 100, 1000).iterator)
  .throttle(10, per = 1.second) // faster upstream
  .conflateWithSeed(el => Summed(el))((acc, el) => acc.sum(Summed(el))) // (Summed, Int) => Summed
  .throttle(1, per = 1.second) // slow downstream
Java
sourceclass Summed {

  private final Integer el;

  public Summed(Integer el) {
    this.el = el;
  }

  public Summed sum(Summed other) {
    return new Summed(this.el + other.el);
  }
}

Source.cycle(() -> Arrays.asList(1, 10, 100).iterator())
    .throttle(10, Duration.ofSeconds(1)) // fast upstream
    .conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el)))
    .throttle(1, Duration.ofSeconds(1)); // slow downstream

If downstream is slower, the “seed” function is called which is able to change the type of the to be conflated elements if needed (it can also be an identity function, in which case this conflateWithSeed is equivalent to a plain conflate). Next, the conflating function is applied while there is back-pressure from the downstream, such that the upstream can produce elements at an rate independent of the downstream.

You may want to use this operation for example to apply an average operation on the upstream elements, while the downstream backpressures. This allows us to keep processing upstream elements, and give an average number to the downstream once it is ready to process the next one.

See Rate transformation for more information and examples.

Reactive Streams semantics

emits when downstream stops backpressuring and there is a conflated element available

backpressures when the aggregate or seed functions cannot keep up with incoming elements

completes when upstream completes