throttle

Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element.

Simple operators

Signature

Source.throttleSource.throttle Flow.throttleFlow.throttle

Description

Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element.

The throttle operator combines well with the queue operator to adapt the speeds on both ends of the queue-throttle pair.

See also Buffers and working with rate for related operators.

Example

Imagine the server end of a streaming platform. When a client connects and request a video content, the server should return the content. Instead of serving a complete video as fast as bandwidth allows, throttle can be used to limit the network usage to 24 frames per second (let’s imagine this streaming platform stores frames, not bytes).

Scala
sourceval framesPerSecond = 24

// val frameSource: Source[Frame,_]
val videoThrottling = frameSource.throttle(framesPerSecond, 1.second)
// serialize `Frame` and send over the network.
Java
sourceint framesPerSecond = 24;

Source<Frame, NotUsed> videoThrottling =
    frameSource.throttle(framesPerSecond, Duration.ofSeconds(1));
// serialize `Frame` and send over the network.

The problem in the example above is that when there’s a network hiccup, the video playback will interrupt. It can be improved by sending more content than the necessary ahead of time and let the client buffer that. So, throttle can be used to burst the first 30 seconds and then send a constant of 24 frames per second. This way, when a request comes in a good chunk of content will be downloaded and after that the server will activate the throttling.

Scala
source// val frameSource: Source[Frame,_]
val videoThrottlingWithBurst = frameSource.throttle(
  framesPerSecond,
  1.second,
  framesPerSecond * 30, // maximumBurst
  ThrottleMode.Shaping)
// serialize `Frame` and send over the network.
Java
sourceSource<Frame, NotUsed> throttlingWithBurst =
    frameSource.throttle(
        framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping());
// serialize `Frame` and send over the network.

The extra argument to set the ThrottleMode to shaping tells throttle to make pauses to avoid exceeding the maximum rate. Alternatively we could set the ThrottleMode to enforcing to cause a stream failure when upstream is faster than the throttle rate.

The examples above don’t cover all the parameters supported by throttle (e.g. cost-based throttling). See the api documentationapi documentation for all the details.

Reactive Streams semantics

emits when upstream emits an element and configured time per each element elapsed

backpressures when downstream backpressures

completes when upstream completes