collectWhile

Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.

Simple operators

Signature

Source.collectWhileSource.collectWhile Flow.collectWhileFlow.collectWhile

Description

Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.

Example

Scala
sourceFlow[Message].collectWhile {
  case Ping(id) if id <= 100 => Pong(id)
}
Java
sourceFlow<Message, Pong, NotUsed> flow =
    Flow.of(Message.class)
        .collectWhile(
            PFBuilder.<Message, Pong>create()
                .match(Ping.class, p -> p.id <= 100, p -> new Pong(p.id))
                .build());

Reactive Streams semantics

emits when the provided partial function is defined for the element

backpressures when the partial function is defined for the element and downstream backpressures

completes when upstream completes or the partial function is not applied

cancels when downstream cancels