expand
Like extrapolate
, but does not have the initial
argument, and the Iterator
is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.
Signature
Source.expand
Source.expand
Flow.expand
Flow.expand
Description
Like extrapolate
, but does not have the initial
argument, and the Iterator
is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.
See Understanding extrapolate and expand for more information and examples.
Example
Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). But we also want to watermark every decoded frame with the name of our colleague. expand
provides access to the element flowing through the stream and let’s us create extra frames in case the producer slows down:
- Scala
-
source
// each element flowing through the stream is expanded to a watermark copy // of the upstream frame and grayed out copies. The grayed out copies should // only be used downstream if the producer is too slow. val watermarkerRateControl: Flow[Frame, Frame, NotUsed] = Flow[Frame].expand((frame: Frame) => { val watermarked = frame.withFilter(Watermark) val grayedOut = frame.withFilter(Gray) Iterator.single(watermarked) ++ Iterator.continually(grayedOut) }) val watermarkedVideoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl) // let's create a 25fps stream (a Frame every 40.millis) val ticks: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) val watermarkedVideoAt25Fps: Source[Frame, Cancellable] = ticks.zip(watermarkedVideoSource).map(_._2)
- Java
-
source
// each element flowing through the stream is expanded to a watermark copy // of the upstream frame and grayed out copies. The grayed out copies should // only be used downstream if the producer is too slow. Flow<Frame, Frame, NotUsed> watermarkerRateControl = Flow.of(Frame.class) .expand( lastFrame -> { Frame watermarked = new Frame( lastFrame.pixels().$plus$plus(ByteString.fromString(" - watermark"))); Frame gray = new Frame(lastFrame.pixels().$plus$plus(ByteString.fromString(" - gray"))); return Stream.concat(Stream.of(watermarked), Stream.iterate(gray, i -> i)) .iterator(); }); Source<Frame, NotUsed> watermakedVideoSource = networkSource.via(decode).via(watermarkerRateControl); // let's create a 25fps stream (a Frame every 40.millis) Source<String, Cancellable> ticks = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); Source<Frame, Cancellable> watermarkedVideoAt25Fps = ticks.zip(watermakedVideoSource).map(Pair::second);
Reactive Streams semantics
emits when downstream stops backpressuring
backpressures when downstream backpressures
completes when upstream completes