MergeSequence

Merge a linear sequence partitioned across multiple sources.

Fan-in operators

Signature

MergeSequenceMergeSequence

Description

Merge a linear sequence partitioned across multiple sources. Each element from upstream must have a defined index, starting from 0. There must be no gaps in the sequence, nor may there be any duplicates. Each upstream source must be ordered by the sequence.

Example

MergeSequence is most useful when used in combination with Partition, to merge the partitioned stream back into a single stream, while maintaining the order of the original elements. zipWithIndex can be used before partitioning the stream to generate the index.

The example below shows partitioning a stream of messages into one stream for elements that must be processed by a given processing flow, and another stream for elements for which no processing will be done, and then merges them back together so that the messages can be acknowledged in order.

Scala
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.ClosedShape
import pekko.stream.scaladsl.{ Flow, GraphDSL, MergeSequence, Partition, RunnableGraph, Sink, Source }

val subscription: Source[Message, NotUsed] = createSubscription()
val messageProcessor: Flow[(Message, Long), (Message, Long), NotUsed] =
  createMessageProcessor()
val messageAcknowledger: Sink[Message, NotUsed] = createMessageAcknowledger()

RunnableGraph
  .fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    // Partitions stream into messages that should or should not be processed
    val partition = builder.add(Partition[(Message, Long)](2,
      {
        case (message, _) if shouldProcess(message) => 0
        case _                                      => 1
      }))
    // Merges stream by the index produced by zipWithIndex
    val merge = builder.add(MergeSequence[(Message, Long)](2)(_._2))

    subscription.zipWithIndex ~> partition.in
    // First goes through message processor
    partition.out(0) ~> messageProcessor ~> merge
    // Second partition bypasses message processor
    partition.out(1)    ~> merge
    merge.out.map(_._1) ~> messageAcknowledger
    ClosedShape
  })
  .run()
Java
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.ClosedShape;
import org.apache.pekko.stream.UniformFanInShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.GraphDSL;
import org.apache.pekko.stream.javadsl.MergeSequence;
import org.apache.pekko.stream.javadsl.Partition;
import org.apache.pekko.stream.javadsl.RunnableGraph;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

Source<Message, NotUsed> subscription = createSubscription();
Flow<Pair<Message, Long>, Pair<Message, Long>, NotUsed> messageProcessor =
    createMessageProcessor();
Sink<Message, NotUsed> messageAcknowledger = createMessageAcknowledger();

RunnableGraph.fromGraph(
        GraphDSL.create(
            builder -> {
              // Partitions stream into messages that should or should not be processed
              UniformFanOutShape<Pair<Message, Long>, Pair<Message, Long>> partition =
                  builder.add(
                      Partition.create(2, element -> shouldProcess(element.first()) ? 0 : 1));
              // Merges stream by the index produced by zipWithIndex
              UniformFanInShape<Pair<Message, Long>, Pair<Message, Long>> merge =
                  builder.add(MergeSequence.create(2, Pair::second));

              builder.from(builder.add(subscription.zipWithIndex())).viaFanOut(partition);
              // First goes through message processor
              builder.from(partition.out(0)).via(builder.add(messageProcessor)).viaFanIn(merge);
              // Second partition bypasses message processor
              builder.from(partition.out(1)).viaFanIn(merge);

              // Unwrap message index pairs and send to acknowledger
              builder
                  .from(merge.out())
                  .to(
                      builder.add(
                          Flow.<Pair<Message, Long>>create()
                              .map(Pair::first)
                              .to(messageAcknowledger)));

              return ClosedShape.getInstance();
            }))
    .run(system);

Reactive Streams semantics

emits when one of the upstreams has the next expected element in the sequence available.

backpressures when downstream backpressures

completes when all upstreams complete

cancels downstream cancels