MergeSequence
Merge a linear sequence partitioned across multiple sources.
Signature
Description
Merge a linear sequence partitioned across multiple sources. Each element from upstream must have a defined index, starting from 0. There must be no gaps in the sequence, nor may there be any duplicates. Each upstream source must be ordered by the sequence.
Example
MergeSequence
is most useful when used in combination with Partition
, to merge the partitioned stream back into a single stream, while maintaining the order of the original elements. zipWithIndex
can be used before partitioning the stream to generate the index.
The example below shows partitioning a stream of messages into one stream for elements that must be processed by a given processing flow, and another stream for elements for which no processing will be done, and then merges them back together so that the messages can be acknowledged in order.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.ClosedShape import pekko.stream.scaladsl.{ Flow, GraphDSL, MergeSequence, Partition, RunnableGraph, Sink, Source } val subscription: Source[Message, NotUsed] = createSubscription() val messageProcessor: Flow[(Message, Long), (Message, Long), NotUsed] = createMessageProcessor() val messageAcknowledger: Sink[Message, NotUsed] = createMessageAcknowledger() RunnableGraph .fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ // Partitions stream into messages that should or should not be processed val partition = builder.add(Partition[(Message, Long)](2, { case (message, _) if shouldProcess(message) => 0 case _ => 1 })) // Merges stream by the index produced by zipWithIndex val merge = builder.add(MergeSequence[(Message, Long)](2)(_._2)) subscription.zipWithIndex ~> partition.in // First goes through message processor partition.out(0) ~> messageProcessor ~> merge // Second partition bypasses message processor partition.out(1) ~> merge merge.out.map(_._1) ~> messageAcknowledger ClosedShape }) .run()
- Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.ClosedShape; import org.apache.pekko.stream.UniformFanInShape; import org.apache.pekko.stream.UniformFanOutShape; import org.apache.pekko.stream.javadsl.Flow; import org.apache.pekko.stream.javadsl.GraphDSL; import org.apache.pekko.stream.javadsl.MergeSequence; import org.apache.pekko.stream.javadsl.Partition; import org.apache.pekko.stream.javadsl.RunnableGraph; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; Source<Message, NotUsed> subscription = createSubscription(); Flow<Pair<Message, Long>, Pair<Message, Long>, NotUsed> messageProcessor = createMessageProcessor(); Sink<Message, NotUsed> messageAcknowledger = createMessageAcknowledger(); RunnableGraph.fromGraph( GraphDSL.create( builder -> { // Partitions stream into messages that should or should not be processed UniformFanOutShape<Pair<Message, Long>, Pair<Message, Long>> partition = builder.add( Partition.create(2, element -> shouldProcess(element.first()) ? 0 : 1)); // Merges stream by the index produced by zipWithIndex UniformFanInShape<Pair<Message, Long>, Pair<Message, Long>> merge = builder.add(MergeSequence.create(2, Pair::second)); builder.from(builder.add(subscription.zipWithIndex())).viaFanOut(partition); // First goes through message processor builder.from(partition.out(0)).via(builder.add(messageProcessor)).viaFanIn(merge); // Second partition bypasses message processor builder.from(partition.out(1)).viaFanIn(merge); // Unwrap message index pairs and send to acknowledger builder .from(merge.out()) .to( builder.add( Flow.<Pair<Message, Long>>create() .map(Pair::first) .to(messageAcknowledger))); return ClosedShape.getInstance(); })) .run(system);
Reactive Streams semantics
emits when one of the upstreams has the next expected element in the sequence available.
backpressures when downstream backpressures
completes when all upstreams complete
cancels downstream cancels