statefulMap

Transform each stream element with the help of a state.

Simple operators

Signature

Flow.statefulMapFlow.statefulMap

Description

Transform each stream element with the help of a state.

The state creation function is invoked once when the stream is materialized and the returned state is passed to the mapping function for mapping the first element.

The mapping function returns a mapped element to emit downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, be a new immutable state but it is also safe to use a mutable state.

The on complete function is called, once, when the first of upstream completion, downstream cancellation or stream failure happens. If the cause is upstream completion and the downstream is still accepting elements, the returned value from the function is passed downstream before completing the operator itself, for the other cases the returned value is ignored.

The statefulMap operator adheres to the ActorAttributes.SupervisionStrategy attribute.

For mapping stream elements without keeping a state see map.

Examples

In the first example, we implement an zipWithIndex operator like zipWithIndex, it always associates a unique index with each element of the stream, the index starts from 0.

Scala
sourceSource(List("A", "B", "C", "D"))
  .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None)
  .runForeach(println)
// prints
// (A,0)
// (B,1)
// (C,2)
// (D,3)
Java
sourceSource.from(Arrays.asList("A", "B", "C", "D"))
    .statefulMap(
        () -> 0L,
        (index, element) -> Pair.create(index + 1, Pair.create(element, index)),
        indexOnComplete -> Optional.empty())
    .runForeach(System.out::println, system);
// prints
// Pair(A,0)
// Pair(B,1)
// Pair(C,2)
// Pair(D,3)

In the second example, the elements are buffered until the incoming element is different, and then emitted downstream. When upstream completes, if there are buffered elements, they are emitted before completing.

Scala
sourceSource("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
  .statefulMap(() => List.empty[String])(
    (buffer, element) =>
      buffer match {
        case head :: _ if head != element => (element :: Nil, buffer)
        case _                            => (element :: buffer, Nil)
      },
    buffer => Some(buffer))
  .filter(_.nonEmpty)
  .runForeach(println)
// prints
// List(A)
// List(B, B)
// List(C, C, C)
// List(D)
Java
sourceSource.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
    .statefulMap(
        () -> (List<String>) new LinkedList<String>(),
        (buffer, element) -> {
          if (buffer.size() > 0 && (!buffer.get(0).equals(element))) {
            return Pair.create(
                new LinkedList<>(Collections.singletonList(element)),
                Collections.unmodifiableList(buffer));
          } else {
            buffer.add(element);
            return Pair.create(buffer, Collections.<String>emptyList());
          }
        },
        Optional::ofNullable)
    .filterNot(List::isEmpty)
    .runForeach(System.out::println, system);
// prints
// [A]
// [B, B]
// [C, C, C]
// [D]

In the third example, repeated incoming elements are only emitted once and then dropped.

Scala
sourceSource("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
  .statefulMap(() => Option.empty[String])(
    (lastElement, elem) =>
      lastElement match {
        case Some(head) if head == elem => (Some(elem), None)
        case _                          => (Some(elem), Some(elem))
      },
    _ => None)
  .collect { case Some(elem) => elem }
  .runForeach(println)
// prints
// A
// B
// C
// D
Java
sourceSource.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
    .statefulMap(
        Optional::<String>empty,
        (lastElement, element) -> {
          if (lastElement.isPresent() && lastElement.get().equals(element)) {
            return Pair.create(lastElement, Optional.<String>empty());
          } else {
            return Pair.create(Optional.of(element), Optional.of(element));
          }
        },
        listOnComplete -> Optional.empty())
    .via(Flow.flattenOptional())
    .runForeach(System.out::println, system);
// prints
// A
// B
// C
// D

In the fourth example, we combine the statefulMap and mapConcat to implement a statefulMapConcat like behavior.

Scala
sourceSource(1 to 10)
  .statefulMap(() => List.empty[Int])(
    (state, elem) => {
      // grouped 3 elements into a list
      val newState = elem :: state
      if (newState.size == 3)
        (Nil, newState.reverse)
      else
        (newState, Nil)
    },
    state => Some(state.reverse))
  .mapConcat(identity)
  .runForeach(println)
// prints
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
Java
sourceSource.fromJavaStream(() -> IntStream.rangeClosed(1, 10))
    .statefulMap(
        () -> new ArrayList<Integer>(3),
        (list, element) -> {
          list.add(element);
          if (list.size() == 3) {
            return Pair.create(new ArrayList<Integer>(3), Collections.unmodifiableList(list));
          } else {
            return Pair.create(list, Collections.<Integer>emptyList());
          }
        },
        Optional::ofNullable)
    .mapConcat(list -> list)
    .runForeach(System.out::println, system);
// prints
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10

Reactive Streams semantics

emits the mapping function returns an element and downstream is ready to consume it

backpressures downstream backpressures

completes upstream completes

cancels downstream cancels