Buffers and working with rate

Dependency

To use Pekko Streams, add the module to your project:

sbt
val PekkoVersion = "1.0.0"
libraryDependencies += "org.apache.pekko" %% "pekko-stream" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.0.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.0")

  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}"
}

Introduction

When upstream and downstream rates differ, especially when the throughput has spikes, it can be useful to introduce buffers in a stream. In this chapter we cover how buffers are used in Pekko Streams.

Buffers for asynchronous operators

In this section we will discuss internal buffers that are introduced as an optimization when using asynchronous operators.

To run an operator asynchronously it has to be marked explicitly as such using the `.async``.async()` method. Being run asynchronously means that an operator, after handing out an element to its downstream consumer is able to immediately process the next message. To demonstrate what we mean by this, let’s take a look at the following example:

Scala
sourceSource(1 to 3)
  .map { i =>
    println(s"A: $i"); i
  }
  .async
  .map { i =>
    println(s"B: $i"); i
  }
  .async
  .map { i =>
    println(s"C: $i"); i
  }
  .async
  .runWith(Sink.ignore)
Java
sourceSource.from(Arrays.asList(1, 2, 3))
    .map(
        i -> {
          System.out.println("A: " + i);
          return i;
        })
    .async()
    .map(
        i -> {
          System.out.println("B: " + i);
          return i;
        })
    .async()
    .map(
        i -> {
          System.out.println("C: " + i);
          return i;
        })
    .async()
    .runWith(Sink.ignore(), system);

Running the above example, one of the possible outputs looks like this:

A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3

Note that the order is not A:1, B:1, C:1, A:2, B:2, C:2, which would correspond to the normal fused synchronous execution model of flows where an element completely passes through the processing pipeline before the next element enters the flow. The next element is processed by an asynchronous operator as soon as it has emitted the previous one.

While pipelining in general increases throughput, in practice there is a cost of passing an element through the asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Pekko Streams uses a windowed, batching backpressure strategy internally. It is windowed because as opposed to a Stop-And-Wait protocol multiple elements might be “in-flight” concurrently with requests for elements. It is also batching because a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements are requested after multiple elements have been drained. This batching strategy reduces the communication cost of propagating the backpressure signal through the asynchronous boundary.

While this internal protocol is mostly invisible to the user (apart from its throughput increasing effects) there are situations when these details get exposed. In all of our previous examples we always assumed that the rate of the processing chain is strictly coordinated through the backpressure signal causing all operators to process no faster than the throughput of the connected chain. There are tools in Pekko Streams however that enable the rates of different segments of a processing chain to be “detached” or to define the maximum throughput of the stream through external timing sources. These situations are exactly those where the internal batching buffering strategy suddenly becomes non-transparent.

Internal buffers and their effect

As we have explained, for performance reasons Pekko Streams introduces a buffer for every asynchronous operator. The purpose of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there would be no need for throughput improvements. Therefore it is recommended to keep these buffer sizes small, and increase them only to a level suitable for the throughput requirements of the application. Default buffer sizes can be set through configuration:

pekko.stream.materializer.max-input-buffer-size = 16

Alternatively they can be set per stream by adding an attribute to the complete RunnableGraph or on smaller segments of the stream it is possible by defining a separate `Flow``Flow` with these attributes:

Scala
sourceval section = Flow[Int].map(_ * 2).async.addAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
val flow = section.via(Flow[Int].map(_ / 2)).async // the buffer size of this map is the default
val runnableGraph =
  Source(1 to 10).via(flow).to(Sink.foreach(elem => println(elem)))

val withOverriddenDefaults = runnableGraph.withAttributes(Attributes.inputBuffer(initial = 64, max = 64))
Java
sourcefinal Flow<Integer, Integer, NotUsed> flow1 =
    Flow.of(Integer.class)
        .map(elem -> elem * 2)
        .async()
        .addAttributes(Attributes.inputBuffer(1, 1)); // the buffer size of this map is 1
final Flow<Integer, Integer, NotUsed> flow2 =
    flow1
        .via(Flow.of(Integer.class).map(elem -> elem / 2))
        .async(); // the buffer size of this map is the value from the surrounding graph it is
// used in
final RunnableGraph<NotUsed> runnableGraph =
    Source.range(1, 10).via(flow1).to(Sink.foreach(elem -> System.out.println(elem)));

final RunnableGraph<NotUsed> withOverridenDefaults =
    runnableGraph.withAttributes(Attributes.inputBuffer(64, 64));

Here is an example of a code that demonstrate some of the issues caused by internal buffers:

Scala
sourceimport scala.concurrent.duration._
case class Tick()

RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  // this is the asynchronous stage in this graph
  val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count).async)

  Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0

  Source
    .tick(initialDelay = 1.second, interval = 1.second, "message!")
    .conflateWithSeed(seed = _ => 1)((count, _) => count + 1) ~> zipper.in1

  zipper.out ~> Sink.foreach(println)
  ClosedShape
})
Java
sourcefinal Duration oneSecond = Duration.ofSeconds(1);
final Source<String, Cancellable> msgSource = Source.tick(oneSecond, oneSecond, "message!");
final Source<String, Cancellable> tickSource =
    Source.tick(oneSecond.multipliedBy(3), oneSecond.multipliedBy(3), "tick");
final Flow<String, Integer, NotUsed> conflate =
    Flow.of(String.class).conflateWithSeed(first -> 1, (count, elem) -> count + 1);

RunnableGraph.fromGraph(
        GraphDSL.create(
            b -> {
              // this is the asynchronous stage in this graph
              final FanInShape2<String, Integer, Integer> zipper =
                  b.add(ZipWith.create((String tick, Integer count) -> count).async());
              b.from(b.add(msgSource)).via(b.add(conflate)).toInlet(zipper.in1());
              b.from(b.add(tickSource)).toInlet(zipper.in0());
              b.from(zipper.out()).to(b.add(Sink.foreach(elem -> System.out.println(elem))));
              return ClosedShape.getInstance();
            }))
    .run(system);

Running the above example one would expect the number 3 to be printed in every 3 seconds (the conflateWithSeed step here is configured so that it counts the number of elements received before the downstream `ZipWith``ZipWith` consumes them). What is being printed is different though, we will see the number 1. The reason for this is the internal buffer which is by default 16 elements large, and prefetches elements before the `ZipWith``ZipWith` starts consuming them. It is possible to fix this issue by changing the buffer size of `ZipWith``ZipWith` to 1. We will still see a leading 1 though which is caused by an initial prefetch of the `ZipWith``ZipWith` element.

Note

In general, when time or rate driven operators exhibit strange behavior, one of the first solutions to try should be to decrease the input buffer of the affected elements to 1.

Buffers in Pekko Streams

In this section we will discuss explicit user defined buffers that are part of the domain logic of the stream processing pipeline of an application.

The example below will ensure that 1000 jobs (but not more) are dequeued from an external (imaginary) system and stored locally in memory - relieving the external system:

Scala
source// Getting a stream of jobs from an imaginary external system as a Source
val jobs: Source[Job, NotUsed] = inboundJobsConnector()
jobs.buffer(1000, OverflowStrategy.backpressure)
Java
source// Getting a stream of jobs from an imaginary external system as a Source
final Source<Job, NotUsed> jobs = inboundJobsConnector;
jobs.buffer(1000, OverflowStrategy.backpressure());

The next example will also queue up 1000 jobs locally, but if there are more jobs waiting in the imaginary external systems, it makes space for the new element by dropping one element from the tail of the buffer. Dropping from the tail is a very common strategy but it must be noted that this will drop the youngest waiting job. If some “fairness” is desired in the sense that we want to be nice to jobs that has been waiting for long, then this option can be useful.

Scala
sourcejobs.buffer(1000, OverflowStrategy.dropTail)
Java
sourcejobs.buffer(1000, OverflowStrategy.dropTail());

Instead of dropping the youngest element from the tail of the buffer a new element can be dropped without enqueueing it to the buffer at all.

Scala
sourcejobs.buffer(1000, OverflowStrategy.dropNew)
Java
sourcejobs.buffer(1000, OverflowStrategy.dropNew());

Here is another example with a queue of 1000 jobs, but it makes space for the new element by dropping one element from the head of the buffer. This is the oldest waiting job. This is the preferred strategy if jobs are expected to be resent if not processed in a certain period. The oldest element will be retransmitted soon, (in fact a retransmitted duplicate might be already in the queue!) so it makes sense to drop it first.

Scala
sourcejobs.buffer(1000, OverflowStrategy.dropHead)
Java
sourcejobs.buffer(1000, OverflowStrategy.dropHead());

Compared to the dropping strategies above, dropBuffer drops all the 1000 jobs it has enqueued once the buffer gets full. This aggressive strategy is useful when dropping jobs is preferred to delaying jobs.

Scala
sourcejobs.buffer(1000, OverflowStrategy.dropBuffer)
Java
sourcejobs.buffer(1000, OverflowStrategy.dropBuffer());

If our imaginary external job provider is a client using our API, we might want to enforce that the client cannot have more than 1000 queued jobs otherwise we consider it flooding and terminate the connection. This is achievable by the error strategy which fails the stream once the buffer gets full.

Scala
sourcejobs.buffer(1000, OverflowStrategy.fail)
Java
sourcejobs.buffer(1000, OverflowStrategy.fail());

Rate transformation

Understanding conflate

When a fast producer can not be informed to slow down by backpressure or some other signal, conflate might be useful to combine elements from a producer until a demand signal comes from a consumer.

Below is an example snippet that summarizes fast stream of elements to a standard deviation, mean and count of elements that have arrived while the stats have been calculated.

Scala
sourceval statsFlow = Flow[Double].conflateWithSeed(immutable.Seq(_))(_ :+ _).map { s =>
  val μ = s.sum / s.size
  val se = s.map(x => pow(x - μ, 2))
  val σ = sqrt(se.sum / se.size)
  (σ, μ, s.size)
}
Java
sourcefinal Flow<Double, Tuple3<Double, Double, Integer>, NotUsed> statsFlow =
    Flow.of(Double.class)
        .conflateWithSeed(
            elem -> Collections.singletonList(elem),
            (acc, elem) -> {
              return Stream.concat(acc.stream(), Collections.singletonList(elem).stream())
                  .collect(Collectors.toList());
            })
        .map(
            s -> {
              final Double mean = s.stream().mapToDouble(d -> d).sum() / s.size();
              final DoubleStream se = s.stream().mapToDouble(x -> Math.pow(x - mean, 2));
              final Double stdDev = Math.sqrt(se.sum() / s.size());
              return new Tuple3<>(stdDev, mean, s.size());
            });

This example demonstrates that such flow’s rate is decoupled. The element rate at the start of the flow can be much higher than the element rate at the end of the flow.

Another possible use of conflate is to not consider all elements for summary when the producer starts getting too fast. The example below demonstrates how conflate can be used to randomly drop elements when the consumer is not able to keep up with the producer.

Scala
sourceval p = 0.01
val sampleFlow = Flow[Double]
  .conflateWithSeed(immutable.Seq(_)) {
    case (acc, elem) if Random.nextDouble() < p => acc :+ elem
    case (acc, _)                               => acc
  }
  .mapConcat(identity)
Java
sourcefinal Double p = 0.01;
final Flow<Double, Double, NotUsed> sampleFlow =
    Flow.of(Double.class)
        .conflateWithSeed(
            elem -> Collections.singletonList(elem),
            (acc, elem) -> {
              if (r.nextDouble() < p) {
                return Stream.concat(acc.stream(), Collections.singletonList(elem).stream())
                    .collect(Collectors.toList());
              }
              return acc;
            })
        .mapConcat(d -> d);

See also conflate and conflateWithSeed` for more information and examples.

Understanding extrapolate and expand

Now we will discuss two operators, extrapolate and expand, helping to deal with slow producers that are unable to keep up with the demand coming from consumers. They allow for additional values to be sent as elements to a consumer.

As a simple use case of extrapolate, here is a flow that repeats the last emitted element to a consumer, whenever the consumer signals demand and the producer cannot supply new elements yet.

Scala
sourceval lastFlow = Flow[Double].extrapolate(Iterator.continually(_))
Java
sourcefinal Flow<Double, Double, NotUsed> lastFlow =
    Flow.of(Double.class).extrapolate(in -> Stream.iterate(in, i -> i).iterator());

For situations where there may be downstream demand before any element is emitted from upstream, you can use the initial parameter of extrapolate to “seed” the stream.

Scala
sourceval initial = 2.0
val seedFlow = Flow[Double].extrapolate(Iterator.continually(_), Some(initial))
Java
sourceDouble initial = 2.0;
final Flow<Double, Double, NotUsed> lastFlow =
    Flow.of(Double.class).extrapolate(in -> Stream.iterate(in, i -> i).iterator(), initial);

extrapolate and expand also allow to produce meta-information based on demand signalled from the downstream. Leveraging this, here is a flow that tracks and reports a drift between a fast consumer and a slow producer.

Scala
sourceval driftFlow = Flow[Double].map(_ -> 0).extrapolate[(Double, Int)] { case (i, _) => Iterator.from(1).map(i -> _) }
Java
sourcefinal Flow<Double, Pair<Double, Integer>, NotUsed> driftFlow =
    Flow.of(Double.class)
        .map(d -> new Pair<>(d, 0))
        .extrapolate(
            d -> Stream.iterate(1, i -> i + 1).map(i -> new Pair<>(d.first(), i)).iterator());

And here’s a more concise representation with expand.

Scala
sourceval driftFlow = Flow[Double].expand(i => Iterator.from(0).map(i -> _))
Java
sourcefinal Flow<Double, Pair<Double, Integer>, NotUsed> driftFlow =
    Flow.of(Double.class)
        .expand(d -> Stream.iterate(0, i -> i + 1).map(i -> new Pair<>(d, i)).iterator());

The difference is due to the different handling of the Iterator-generating argument.

While extrapolate uses an Iterator only when there is unmet downstream demand, expand always creates an Iterator and emits elements downstream from it.

This makes expand able to transform or even filter out (by providing an empty Iterator) the “original” elements.

Regardless, since we provide a non-empty Iterator in both examples, this means that the output of this flow is going to report a drift of zero if the producer is fast enough - or a larger drift otherwise.

See also extrapolate and expand for more information and examples.