expand
Like extrapolate, but does not have the initial argument, and the Iterator is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.
Signature
Source.expandSource.expand Flow.expandFlow.expand
Description
Like extrapolate, but does not have the initial argument, and the Iterator is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.
See Understanding extrapolate and expand for more information and examples.
This operator adheres to the ActorAttributes.SupervisionStrategy attribute for exceptions thrown by the expander function or during iterator evaluation (hasNext/next). On Supervision.Stop the stream fails; on Supervision.Resume the failed element is dropped and the current extrapolation state is kept when the failure occurred in the expander function (a previously active iterator is retained), but is necessarily discarded when the failure occurred during iterator evaluation; on Supervision.Restart the failed element is dropped and the current extrapolation state is reset.
Example
Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). But we also want to watermark every decoded frame with the name of our colleague. expand provides access to the element flowing through the stream and let’s us create extra frames in case the producer slows down:
- Scala
-
source
// each element flowing through the stream is expanded to a watermark copy // of the upstream frame and grayed out copies. The grayed out copies should // only be used downstream if the producer is too slow. val watermarkerRateControl: Flow[Frame, Frame, NotUsed] = Flow[Frame].expand((frame: Frame) => { val watermarked = frame.withFilter(Watermark) val grayedOut = frame.withFilter(Gray) Iterator.single(watermarked) ++ Iterator.continually(grayedOut) }) val watermarkedVideoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl) // let's create a 25fps stream (a Frame every 40.millis) val ticks: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) val watermarkedVideoAt25Fps: Source[Frame, Cancellable] = ticks.zip(watermarkedVideoSource).map(_._2) - Java
-
source
// each element flowing through the stream is expanded to a watermark copy // of the upstream frame and grayed out copies. The grayed out copies should // only be used downstream if the producer is too slow. Flow<Frame, Frame, NotUsed> watermarkerRateControl = Flow.of(Frame.class) .expand( lastFrame -> { Frame watermarked = new Frame( lastFrame.pixels().$plus$plus(ByteString.fromString(" - watermark"))); Frame gray = new Frame(lastFrame.pixels().$plus$plus(ByteString.fromString(" - gray"))); return Stream.concat(Stream.of(watermarked), Stream.iterate(gray, i -> i)) .iterator(); }); Source<Frame, NotUsed> watermakedVideoSource = networkSource.via(decode).via(watermarkerRateControl); // let's create a 25fps stream (a Frame every 40.millis) Source<String, Cancellable> ticks = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); Source<Frame, Cancellable> watermarkedVideoAt25Fps = ticks.zip(watermakedVideoSource).map(Pair::second);
Reactive Streams semantics
emits when downstream stops backpressuring
backpressures when downstream backpressures
completes when upstream completes