expand

Like extrapolate, but does not have the initial argument, and the Iterator is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.

Backpressure aware operators

Signature

Source.expandSource.expand Flow.expandFlow.expand

Description

Like extrapolate, but does not have the initial argument, and the Iterator is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.

See Understanding extrapolate and expand for more information and examples.

Example

Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). But we also want to watermark every decoded frame with the name of our colleague. expand provides access to the element flowing through the stream and let’s us create extra frames in case the producer slows down:

Scala
source// each element flowing through the stream is expanded to a watermark copy
// of the upstream frame and grayed out copies. The grayed out copies should
// only be used downstream if the producer is too slow.
val watermarkerRateControl: Flow[Frame, Frame, NotUsed] =
  Flow[Frame].expand((frame: Frame) => {
    val watermarked = frame.withFilter(Watermark)
    val grayedOut = frame.withFilter(Gray)
    Iterator.single(watermarked) ++ Iterator.continually(grayedOut)
  })

val watermarkedVideoSource: Source[Frame, NotUsed] =
  networkSource.via(decode).via(rateControl)

// let's create a 25fps stream (a Frame every 40.millis)
val ticks: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick)

val watermarkedVideoAt25Fps: Source[Frame, Cancellable] =
  ticks.zip(watermarkedVideoSource).map(_._2)
Java
source// each element flowing through the stream is expanded to a watermark copy
// of the upstream frame and grayed out copies. The grayed out copies should
// only be used downstream if the producer is too slow.
Flow<Frame, Frame, NotUsed> watermarkerRateControl =
    Flow.of(Frame.class)
        .expand(
            lastFrame -> {
              Frame watermarked =
                  new Frame(
                      lastFrame.pixels().$plus$plus(ByteString.fromString(" - watermark")));
              Frame gray =
                  new Frame(lastFrame.pixels().$plus$plus(ByteString.fromString(" - gray")));
              return Stream.concat(Stream.of(watermarked), Stream.iterate(gray, i -> i))
                  .iterator();
            });

Source<Frame, NotUsed> watermakedVideoSource =
    networkSource.via(decode).via(watermarkerRateControl);

// let's create a 25fps stream (a Frame every 40.millis)
Source<String, Cancellable> ticks = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick");

Source<Frame, Cancellable> watermarkedVideoAt25Fps =
    ticks.zip(watermakedVideoSource).map(Pair::second);

Reactive Streams semantics

emits when downstream stops backpressuring

backpressures when downstream backpressures

completes when upstream completes