extrapolate

Allow for a faster downstream by expanding the last emitted element to an Iterator.

Backpressure aware operators

Signature

Source.extrapolateSource.extrapolate Flow.extrapolateFlow.extrapolate

Description

Allow for a faster downstream by expanding the last emitted element to an Iterator. For example, an Iterator.continually(element) will cause extrapolate to keep repeating the last emitted element.

All original elements are always emitted unchanged - the Iterator is only used whenever there is downstream demand before upstream emits a new element.

Includes an optional initial argument to prevent blocking the entire stream when there are multiple producers.

See Understanding extrapolate and expand for more information and examples.

Example

Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last frame decoded:

Scala
source// if upstream is too slow, produce copies of the last frame but grayed out.
val rateControl: Flow[Frame, Frame, NotUsed] =
  Flow[Frame].extrapolate((frame: Frame) => {
      val grayedOut = frame.withFilter(Gray)
      Iterator.continually(grayedOut)
    }, Some(Frame.blackFrame))

val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl)

// let's create a 25fps stream (a Frame every 40.millis)
val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick)

val videoAt25Fps: Source[Frame, Cancellable] =
  tickSource.zip(videoSource).map(_._2)
Java
source// if upstream is too slow, produce copies of the last frame but grayed out.
Flow<Frame, Frame, NotUsed> rateControl =
    Flow.of(Frame.class)
        .extrapolate(
            lastFrame -> {
              Frame gray =
                  new Frame(
                      ByteString.fromString(
                          "gray frame!! - " + lastFrame.pixels().utf8String()));
              return Stream.iterate(gray, i -> i).iterator();
            },
            BLACK_FRAME // initial value
            );

Source<Frame, NotUsed> videoSource = networkSource.via(decode).via(rateControl);

// let's create a 25fps stream (a Frame every 40.millis)
Source<String, Cancellable> tickSource =
    Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick");

Source<Frame, Cancellable> videoAt25Fps = tickSource.zip(videoSource).map(Pair::second);

Reactive Streams semantics

emits when downstream stops backpressuring

backpressures when downstream backpressures

completes when upstream completes