sliding
Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Signature
Description
Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Note: the last window might be smaller than the requested size due to end of stream.
Examples
In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and the default step which is 1a step value of 1.
- Scala
- 
  source val source = Source(1 to 4) source.sliding(2).runForeach(println) // prints: // Vector(1, 2) // Vector(2, 3) // Vector(3, 4)
- Java
- 
  source Source<Integer, NotUsed> source = Source.range(1, 4); source.sliding(2, 1).runForeach(n -> System.out.println(n), system); // prints: // [1, 2] // [2, 3] // [3, 4]
If the stream stops without having seen enough elements to fill a window, the last window will have as many elements was emitted before the stream ended. Here we also provide a step to move two elements forward for each window:
- Scala
- 
  source val source = Source(1 to 4) source.sliding(n = 3, step = 2).runForeach(println) // prints: // Vector(1, 2, 3) // Vector(3, 4) - shorter because stream ended before we got 3 elements
- Java
- 
  source Source<Integer, NotUsed> source = Source.range(1, 4); source.sliding(3, 2).runForeach(n -> System.out.println(n), system); // prints: // Vector(1, 2, 3) // [1, 2, 3] // [3, 4] - shorter because stream ended before we got 3 elements
One use case for sliding is to implement a moving average, here we do that with a “period” of 5:
- Scala
- 
  source val numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil) val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size) movingAverage.runForeach(println) // prints // 3.8 = average of 1, 3, 10, 2, 3 // 4.4 = average of 3, 10, 2, 3, 4 // 4.2 = average of 10, 2, 3, 4, 2 // 4.2 = average of 2, 3, 4, 2, 10 // 6.0 = average of 3, 4, 2, 10, 11
- Java
- 
  source Source<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 3, 10, 2, 3, 4, 2, 10, 11)); Source<Float, NotUsed> movingAverage = numbers .sliding(5, 1) .map(window -> ((float) window.stream().mapToInt(i -> i).sum()) / window.size()); movingAverage.runForeach(n -> System.out.println(n), system); // prints // 3.8 = average of 1, 3, 10, 2, 3 // 4.4 = average of 3, 10, 2, 3, 4 // 4.2 = average of 10, 2, 3, 4, 2 // 4.2 = average of 2, 3, 4, 2, 10 // 6.0 = average of 3, 4, 2, 10, 11
Sliding can also be used to do simple windowing, see splitAfter.
Reactive Streams semantics
emits the specified number of elements has been accumulated or upstream completed
backpressures when a group has been assembled and downstream backpressures
completes when upstream completes