conflate

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.

Backpressure aware operators

Signature

Source.conflateSource.conflate Flow.conflateFlow.conflate

Description

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or average of incoming numbers, if aggregation should lead to a different type conflateWithSeed can be used:

Example

Scala
sourceimport org.apache.pekko.stream.scaladsl.Source

import scala.concurrent.duration._

Source
  .cycle(() => List(1, 10, 100, 1000).iterator)
  .throttle(10, per = 1.second) // faster upstream
  .conflate((acc, el) => acc + el) // acc: Int, el: Int
  .throttle(1, per = 1.second) // slow downstream
Java
sourceSource.cycle(() -> Arrays.asList(1, 10, 100).iterator())
    .throttle(10, Duration.ofSeconds(1)) // fast upstream
    .conflate((Integer acc, Integer el) -> acc + el)
    .throttle(1, Duration.ofSeconds(1)); // slow downstream

If downstream is slower the elements are conflated by summing them. This means that upstream can continue producing elements while downstream is applying backpressure. For example: downstream is backpressuring while 1, 10 and 100 arrives from upstream, then backpressure stops and the conflated 111 is emitted downstream.

See Rate transformation for more information and examples.

Reactive Streams semantics

emits when downstream stops backpressuring and there is a conflated element available

backpressures when the aggregate function cannot keep up with incoming elements

completes when upstream completes