splitAfter

End the current substream whenever a predicate returns true, starting a new substream for the next element.

Nesting and flattening operators

Signature

Source.splitAfterSource.splitAfter Flow.splitAfterFlow.splitAfter

The splitAfter operator adheres to the ActorAttributes.SupervisionStrategy attribute with the caveat that Supervision.restart behaves the same way as Supervision.resume since Supervision.restart for SubFlows semantically doesn’t make sense.

Description

End the current substream whenever a predicate returns true, starting a new substream for the next element.

Example

Given some time series data source we would like to split the stream into sub-streams for each second. By using sliding we can compare the timestamp of the current and next element to decide when to split.

Scala
sourceSource(1 to 100)
  .throttle(1, 100.millis)
  .map(elem => (elem, Instant.now()))
  .sliding(2)
  .splitAfter { slidingElements =>
    if (slidingElements.size == 2) {
      val current = slidingElements.head
      val next = slidingElements.tail.head
      val currentBucket = LocalDateTime.ofInstant(current._2, ZoneOffset.UTC).withNano(0)
      val nextBucket = LocalDateTime.ofInstant(next._2, ZoneOffset.UTC).withNano(0)
      currentBucket != nextBucket
    } else {
      false
    }
  }
  .map(_.head._1)
  .fold(0)((acc, _) => acc + 1) // sum
  .to(Sink.foreach(println))
  .run()
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 6
// note that the very last element is never included due to sliding,
// but that would not be problem for an infinite stream
Java
sourceSource.range(1, 100)
    .throttle(1, Duration.ofMillis(100))
    .map(elem -> new Pair<>(elem, Instant.now()))
    .sliding(2, 1)
    .splitAfter(
        slidingElements -> {
          if (slidingElements.size() == 2) {
            Pair<Integer, Instant> current = slidingElements.get(0);
            Pair<Integer, Instant> next = slidingElements.get(1);
            LocalDateTime currentBucket =
                LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0);
            LocalDateTime nextBucket =
                LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0);
            return !currentBucket.equals(nextBucket);
          } else {
            return false;
          }
        })
    .map(slidingElements -> slidingElements.get(0).first())
    .fold(0, (acc, notUsed) -> acc + 1) // sum
    .to(Sink.foreach(System.out::println))
    .run(system);
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 6
// note that the very last element is never included due to sliding,
// but that would not be problem for an infinite stream

An alternative way of implementing this is shown in splitWhen example.

Reactive Streams semantics

emits when an element passes through. When the provided predicate is true it emits the element * and opens a new substream for subsequent element

backpressures when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures

completes when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not)