splitWhen

Split off elements into a new substream whenever a predicate function return true.

Nesting and flattening operators

Signature

Source.splitWhenSource.splitWhen Flow.splitWhenFlow.splitWhen

The splitWhen operator adheres to the ActorAttributes.SupervisionStrategy attribute with the caveat that Supervision.restart behaves the same way as Supervision.resume since Supervision.restart for SubFlows semantically doesn’t make sense.

Description

Split off elements into a new substream whenever a predicate function return true.

Example

Given some time series data source we would like to split the stream into sub-streams for each second. We need to compare the timestamp of the previous and current element to decide when to split. This decision can be implemented in a statefulMapConcat operator preceding the splitWhen.

Scala
sourceSource(1 to 100)
  .throttle(1, 100.millis)
  .map(elem => (elem, Instant.now()))
  .statefulMapConcat(() => {
    // stateful decision in statefulMapConcat
    // keep track of time bucket (one per second)
    var currentTimeBucket = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC)

    {
      case (elem, timestamp) =>
        val time = LocalDateTime.ofInstant(timestamp, ZoneOffset.UTC)
        val bucket = time.withNano(0)
        val newBucket = bucket != currentTimeBucket
        if (newBucket)
          currentTimeBucket = bucket
        List((elem, newBucket))
    }
  })
  .splitWhen(_._2) // split when time bucket changes
  .map(_._1)
  .fold(0)((acc, _) => acc + 1) // sum
  .to(Sink.foreach(println))
  .run()
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 7
Java
sourceSource.range(1, 100)
    .throttle(1, Duration.ofMillis(100))
    .map(elem -> new Pair<>(elem, Instant.now()))
    .statefulMapConcat(
        () -> {
          return new Function<Pair<Integer, Instant>, Iterable<Pair<Integer, Boolean>>>() {
            // stateful decision in statefulMapConcat
            // keep track of time bucket (one per second)
            LocalDateTime currentTimeBucket =
                LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC);

            @Override
            public Iterable<Pair<Integer, Boolean>> apply(
                Pair<Integer, Instant> elemTimestamp) {
              LocalDateTime time =
                  LocalDateTime.ofInstant(elemTimestamp.second(), ZoneOffset.UTC);
              LocalDateTime bucket = time.withNano(0);
              boolean newBucket = !bucket.equals(currentTimeBucket);
              if (newBucket) currentTimeBucket = bucket;
              return Collections.singleton(new Pair<>(elemTimestamp.first(), newBucket));
            }
          };
        })
    .splitWhen(elemDecision -> elemDecision.second()) // split when time bucket changes
    .map(elemDecision -> elemDecision.first())
    .fold(0, (acc, notUsed) -> acc + 1) // sum
    .to(Sink.foreach(System.out::println))
    .run(system);
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 7

An alternative way of implementing this is shown in splitAfter example.

Reactive Streams semantics

emits an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements

backpressures when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures

completes when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not)