extrapolate
Allow for a faster downstream by expanding the last emitted element to an Iterator
.
Signature¶
Source.extrapolate
Flow.extrapolate
Description¶
Allow for a faster downstream by expanding the last emitted element to an Iterator
. For example, an Iterator.continually(element)
will cause extrapolate
to keep repeating the last emitted element.
All original elements are always emitted unchanged - the Iterator
is only used whenever there is downstream demand before upstream emits a new element.
Includes an optional initial
argument to prevent blocking the entire stream when there are multiple producers.
See Understanding extrapolate and expand for more information and examples.
Example¶
Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last frame decoded:
source// if upstream is too slow, produce copies of the last frame but grayed out.
val rateControl: Flow[Frame, Frame, NotUsed] =
Flow[Frame].extrapolate((frame: Frame) => {
val grayedOut = frame.withFilter(Gray)
Iterator.continually(grayedOut)
}, Some(Frame.blackFrame))
val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl)
// let's create a 25fps stream (a Frame every 40.millis)
val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick)
val videoAt25Fps: Source[Frame, Cancellable] =
tickSource.zip(videoSource).map(_._2)
source// if upstream is too slow, produce copies of the last frame but grayed out.
Flow<Frame, Frame, NotUsed> rateControl =
Flow.of(Frame.class)
.extrapolate(
lastFrame -> {
Frame gray =
new Frame(
ByteString.fromString(
"gray frame!! - " + lastFrame.pixels().utf8String()));
return Stream.iterate(gray, i -> i).iterator();
},
BLACK_FRAME // initial value
);
Source<Frame, NotUsed> videoSource = networkSource.via(decode).via(rateControl);
// let's create a 25fps stream (a Frame every 40.millis)
Source<String, Cancellable> tickSource =
Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick");
Source<Frame, Cancellable> videoAt25Fps = tickSource.zip(videoSource).map(Pair::second);
Reactive Streams semantics¶
emits when downstream stops backpressuring
backpressures when downstream backpressures
completes when upstream completes