splitWhen
Split off elements into a new substream whenever a predicate function return true
.
Nesting and flattening operators
Signature
Source.splitWhen
Source.splitWhen
Flow.splitWhen
Flow.splitWhen
The splitWhen
operator adheres to the ActorAttributes.SupervisionStrategy attribute with the caveat that Supervision.restart
behaves the same way as Supervision.resume
since Supervision.restart
for SubFlow
s semantically doesn’t make sense.
Description
Split off elements into a new substream whenever a predicate function return true
.
Example
Given some time series data source we would like to split the stream into sub-streams for each second. We need to compare the timestamp of the previous and current element to decide when to split. This decision can be implemented in a statefulMapConcat
operator preceding the splitWhen
.
- Scala
-
source
Source(1 to 100) .throttle(1, 100.millis) .map(elem => (elem, Instant.now())) .statefulMapConcat(() => { // stateful decision in statefulMapConcat // keep track of time bucket (one per second) var currentTimeBucket = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC) { case (elem, timestamp) => val time = LocalDateTime.ofInstant(timestamp, ZoneOffset.UTC) val bucket = time.withNano(0) val newBucket = bucket != currentTimeBucket if (newBucket) currentTimeBucket = bucket List((elem, newBucket)) } }) .splitWhen(_._2) // split when time bucket changes .map(_._1) .fold(0)((acc, _) => acc + 1) // sum .to(Sink.foreach(println)) .run() // 3 // 10 // 10 // 10 // 10 // 10 // 10 // 10 // 10 // 10 // 7
- Java
-
source
Source.range(1, 100) .throttle(1, Duration.ofMillis(100)) .map(elem -> new Pair<>(elem, Instant.now())) .statefulMapConcat( () -> { return new Function<Pair<Integer, Instant>, Iterable<Pair<Integer, Boolean>>>() { // stateful decision in statefulMapConcat // keep track of time bucket (one per second) LocalDateTime currentTimeBucket = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC); @Override public Iterable<Pair<Integer, Boolean>> apply( Pair<Integer, Instant> elemTimestamp) { LocalDateTime time = LocalDateTime.ofInstant(elemTimestamp.second(), ZoneOffset.UTC); LocalDateTime bucket = time.withNano(0); boolean newBucket = !bucket.equals(currentTimeBucket); if (newBucket) currentTimeBucket = bucket; return Collections.singleton(new Pair<>(elemTimestamp.first(), newBucket)); } }; }) .splitWhen(elemDecision -> elemDecision.second()) // split when time bucket changes .map(elemDecision -> elemDecision.first()) .fold(0, (acc, notUsed) -> acc + 1) // sum .to(Sink.foreach(System.out::println)) .run(system); // 3 // 10 // 10 // 10 // 10 // 10 // 10 // 10 // 10 // 10 // 7
An alternative way of implementing this is shown in splitAfter example.
Reactive Streams semantics
emits an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements
backpressures when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures
completes when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not)