extrapolate
Allow for a faster downstream by expanding the last emitted element to an Iterator.
Signature
Source.extrapolateSource.extrapolate Flow.extrapolateFlow.extrapolate
Description
Allow for a faster downstream by expanding the last emitted element to an Iterator. For example, an Iterator.continually(element) will cause extrapolate to keep repeating the last emitted element.
All original elements are always emitted unchanged - the Iterator is only used whenever there is downstream demand before upstream emits a new element.
Includes an optional initial argument to prevent blocking the entire stream when there are multiple producers.
See Understanding extrapolate and expand for more information and examples.
Example
Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last frame decoded:
- Scala
-
source
// if upstream is too slow, produce copies of the last frame but grayed out. val rateControl: Flow[Frame, Frame, NotUsed] = Flow[Frame].extrapolate((frame: Frame) => { val grayedOut = frame.withFilter(Gray) Iterator.continually(grayedOut) }, Some(Frame.blackFrame)) val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl) // let's create a 25fps stream (a Frame every 40.millis) val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) val videoAt25Fps: Source[Frame, Cancellable] = tickSource.zip(videoSource).map(_._2) - Java
-
source
// if upstream is too slow, produce copies of the last frame but grayed out. Flow<Frame, Frame, NotUsed> rateControl = Flow.of(Frame.class) .extrapolate( lastFrame -> { Frame gray = new Frame( ByteString.fromString( "gray frame!! - " + lastFrame.pixels().utf8String())); return Stream.iterate(gray, i -> i).iterator(); }, BLACK_FRAME // initial value ); Source<Frame, NotUsed> videoSource = networkSource.via(decode).via(rateControl); // let's create a 25fps stream (a Frame every 40.millis) Source<String, Cancellable> tickSource = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); Source<Frame, Cancellable> videoAt25Fps = tickSource.zip(videoSource).map(Pair::second);
Reactive Streams semantics
emits when downstream stops backpressuring
backpressures when downstream backpressures
completes when upstream completes