statefulMap
Transform each stream element with the help of a state.
Signature¶
Description¶
Transform each stream element with the help of a state.
The state creation function is invoked once when the stream is materialized and the returned state is passed to the mapping function for mapping the first element.
The mapping function returns a mapped element to emit downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, be a new immutable state but it is also safe to use a mutable state.
The on complete function is called, once, when the first of upstream completion, downstream cancellation or stream failure happens. If the cause is upstream completion and the downstream is still accepting elements, the returned value from the function is passed downstream before completing the operator itself, for the other cases the returned value is ignored.
The statefulMap
operator adheres to the ActorAttributes.SupervisionStrategy attribute.
For mapping stream elements without keeping a state see map.
Examples¶
In the first example, we implement an zipWithIndex
operator like zipWithIndex, it always associates a unique index with each element of the stream, the index starts from 0.
sourceSource(List("A", "B", "C", "D"))
.statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None)
.runForeach(println)
// prints
// (A,0)
// (B,1)
// (C,2)
// (D,3)
sourceSource.from(Arrays.asList("A", "B", "C", "D"))
.statefulMap(
() -> 0L,
(index, element) -> Pair.create(index + 1, Pair.create(element, index)),
indexOnComplete -> Optional.empty())
.runForeach(System.out::println, system);
// prints
// Pair(A,0)
// Pair(B,1)
// Pair(C,2)
// Pair(D,3)
In the second example, the elements are buffered until the incoming element is different, and then emitted downstream. When upstream completes, if there are buffered elements, they are emitted before completing.
sourceSource("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
.statefulMap(() => List.empty[String])(
(buffer, element) =>
buffer match {
case head :: _ if head != element => (element :: Nil, buffer)
case _ => (element :: buffer, Nil)
},
buffer => Some(buffer))
.filter(_.nonEmpty)
.runForeach(println)
// prints
// List(A)
// List(B, B)
// List(C, C, C)
// List(D)
sourceSource.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
.statefulMap(
() -> (List<String>) new LinkedList<String>(),
(buffer, element) -> {
if (buffer.size() > 0 && (!buffer.get(0).equals(element))) {
return Pair.create(
new LinkedList<>(Collections.singletonList(element)),
Collections.unmodifiableList(buffer));
} else {
buffer.add(element);
return Pair.create(buffer, Collections.<String>emptyList());
}
},
Optional::ofNullable)
.filterNot(List::isEmpty)
.runForeach(System.out::println, system);
// prints
// [A]
// [B, B]
// [C, C, C]
// [D]
In the third example, repeated incoming elements are only emitted once and then dropped.
sourceSource("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
.statefulMap(() => Option.empty[String])(
(lastElement, elem) =>
lastElement match {
case Some(head) if head == elem => (Some(elem), None)
case _ => (Some(elem), Some(elem))
},
_ => None)
.collect { case Some(elem) => elem }
.runForeach(println)
// prints
// A
// B
// C
// D
sourceSource.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
.statefulMap(
Optional::<String>empty,
(lastElement, element) -> {
if (lastElement.isPresent() && lastElement.get().equals(element)) {
return Pair.create(lastElement, Optional.<String>empty());
} else {
return Pair.create(Optional.of(element), Optional.of(element));
}
},
listOnComplete -> Optional.empty())
.via(Flow.flattenOptional())
.runForeach(System.out::println, system);
// prints
// A
// B
// C
// D
In the fourth example, we combine the statefulMap and mapConcat to implement a statefulMapConcat like behavior.
sourceSource(1 to 10)
.statefulMap(() => List.empty[Int])(
(state, elem) => {
// grouped 3 elements into a list
val newState = elem :: state
if (newState.size == 3)
(Nil, newState.reverse)
else
(newState, Nil)
},
state => Some(state.reverse))
.mapConcat(identity)
.runForeach(println)
// prints
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
sourceSource.fromJavaStream(() -> IntStream.rangeClosed(1, 10))
.statefulMap(
() -> new ArrayList<Integer>(3),
(list, element) -> {
list.add(element);
if (list.size() == 3) {
return Pair.create(new ArrayList<Integer>(3), Collections.unmodifiableList(list));
} else {
return Pair.create(list, Collections.<Integer>emptyList());
}
},
Optional::ofNullable)
.mapConcat(list -> list)
.runForeach(System.out::println, system);
// prints
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
Reactive Streams semantics¶
emits the mapping function returns an element and downstream is ready to consume it
backpressures downstream backpressures
completes upstream completes
cancels downstream cancels