extrapolate
Allow for a faster downstream by expanding the last emitted element to an Iterator.
Signature
Source.extrapolateSource.extrapolate Flow.extrapolateFlow.extrapolate
Description
Allow for a faster downstream by expanding the last emitted element to an Iterator. For example, an Iterator.continually(element) will cause extrapolate to keep repeating the last emitted element.
All original elements are always emitted unchanged - the Iterator is only used whenever there is downstream demand before upstream emits a new element.
Includes an optional initial argument to prevent blocking the entire stream when there are multiple producers.
See Understanding extrapolate and expand for more information and examples.
This operator adheres to the ActorAttributes.SupervisionStrategy attribute for exceptions thrown by the extrapolator function or during iterator evaluation (hasNext/next). On Supervision.Stop the stream fails; on Supervision.Resume the failed element is dropped and any previously active extrapolation is retained; on Supervision.Restart the failed element is dropped and the current extrapolation state is reset. For iterator-evaluation failures, Resume and Restart both discard the corrupt iterator because it cannot be reused.
Example
Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last frame decoded:
- Scala
-
source
// if upstream is too slow, produce copies of the last frame but grayed out. val rateControl: Flow[Frame, Frame, NotUsed] = Flow[Frame].extrapolate((frame: Frame) => { val grayedOut = frame.withFilter(Gray) Iterator.continually(grayedOut) }, Some(Frame.blackFrame)) val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl) // let's create a 25fps stream (a Frame every 40.millis) val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) val videoAt25Fps: Source[Frame, Cancellable] = tickSource.zip(videoSource).map(_._2) - Java
-
source
// if upstream is too slow, produce copies of the last frame but grayed out. Flow<Frame, Frame, NotUsed> rateControl = Flow.of(Frame.class) .extrapolate( lastFrame -> { Frame gray = new Frame( ByteString.fromString( "gray frame!! - " + lastFrame.pixels().utf8String())); return Stream.iterate(gray, i -> i).iterator(); }, BLACK_FRAME // initial value ); Source<Frame, NotUsed> videoSource = networkSource.via(decode).via(rateControl); // let's create a 25fps stream (a Frame every 40.millis) Source<String, Cancellable> tickSource = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); Source<Frame, Cancellable> videoAt25Fps = tickSource.zip(videoSource).map(Pair::second);
Reactive Streams semantics
emits when downstream stops backpressuring
backpressures when downstream backpressures
completes when upstream completes