expand

Like extrapolate, but does not have the initial argument, and the Iterator is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.

Backpressure aware operators

Signature

Source.expandSource.expand Flow.expandFlow.expand

Description

Like extrapolate, but does not have the initial argument, and the Iterator is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.

See Understanding extrapolate and expand for more information and examples.

This operator adheres to the ActorAttributes.SupervisionStrategy attribute for exceptions thrown by the expander function or during iterator evaluation (hasNext/next). On Supervision.Stop the stream fails; on Supervision.Resume the failed element is dropped and the current extrapolation state is kept when the failure occurred in the expander function (a previously active iterator is retained), but is necessarily discarded when the failure occurred during iterator evaluation; on Supervision.Restart the failed element is dropped and the current extrapolation state is reset.

Example

Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). But we also want to watermark every decoded frame with the name of our colleague. expand provides access to the element flowing through the stream and let’s us create extra frames in case the producer slows down:

Scala
source// each element flowing through the stream is expanded to a watermark copy
// of the upstream frame and grayed out copies. The grayed out copies should
// only be used downstream if the producer is too slow.
val watermarkerRateControl: Flow[Frame, Frame, NotUsed] =
  Flow[Frame].expand((frame: Frame) => {
    val watermarked = frame.withFilter(Watermark)
    val grayedOut = frame.withFilter(Gray)
    Iterator.single(watermarked) ++ Iterator.continually(grayedOut)
  })

val watermarkedVideoSource: Source[Frame, NotUsed] =
  networkSource.via(decode).via(rateControl)

// let's create a 25fps stream (a Frame every 40.millis)
val ticks: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick)

val watermarkedVideoAt25Fps: Source[Frame, Cancellable] =
  ticks.zip(watermarkedVideoSource).map(_._2)
Java
source// each element flowing through the stream is expanded to a watermark copy
// of the upstream frame and grayed out copies. The grayed out copies should
// only be used downstream if the producer is too slow.
Flow<Frame, Frame, NotUsed> watermarkerRateControl =
    Flow.of(Frame.class)
        .expand(
            lastFrame -> {
              Frame watermarked =
                  new Frame(
                      lastFrame.pixels().$plus$plus(ByteString.fromString(" - watermark")));
              Frame gray =
                  new Frame(lastFrame.pixels().$plus$plus(ByteString.fromString(" - gray")));
              return Stream.concat(Stream.of(watermarked), Stream.iterate(gray, i -> i))
                  .iterator();
            });

Source<Frame, NotUsed> watermakedVideoSource =
    networkSource.via(decode).via(watermarkerRateControl);

// let's create a 25fps stream (a Frame every 40.millis)
Source<String, Cancellable> ticks = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick");

Source<Frame, Cancellable> watermarkedVideoAt25Fps =
    ticks.zip(watermakedVideoSource).map(Pair::second);

Reactive Streams semantics

emits when downstream stops backpressuring

backpressures when downstream backpressures

completes when upstream completes