MergeSequence
Merge a linear sequence partitioned across multiple sources.
Signature¶
Description¶
Merge a linear sequence partitioned across multiple sources. Each element from upstream must have a defined index, starting from 0. There must be no gaps in the sequence, nor may there be any duplicates. Each upstream source must be ordered by the sequence.
Example¶
MergeSequence
is most useful when used in combination with Partition
, to merge the partitioned stream back into a single stream, while maintaining the order of the original elements. zipWithIndex
can be used before partitioning the stream to generate the index.
The example below shows partitioning a stream of messages into one stream for elements that must be processed by a given processing flow, and another stream for elements for which no processing will be done, and then merges them back together so that the messages can be acknowledged in order.
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.ClosedShape
import pekko.stream.scaladsl.{ Flow, GraphDSL, MergeSequence, Partition, RunnableGraph, Sink, Source }
val subscription: Source[Message, NotUsed] = createSubscription()
val messageProcessor: Flow[(Message, Long), (Message, Long), NotUsed] =
createMessageProcessor()
val messageAcknowledger: Sink[Message, NotUsed] = createMessageAcknowledger()
RunnableGraph
.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
// Partitions stream into messages that should or should not be processed
val partition = builder.add(Partition[(Message, Long)](2,
{
case (message, _) if shouldProcess(message) => 0
case _ => 1
}))
// Merges stream by the index produced by zipWithIndex
val merge = builder.add(MergeSequence[(Message, Long)](2)(_._2))
subscription.zipWithIndex ~> partition.in
// First goes through message processor
partition.out(0) ~> messageProcessor ~> merge
// Second partition bypasses message processor
partition.out(1) ~> merge
merge.out.map(_._1) ~> messageAcknowledger
ClosedShape
})
.run()
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.ClosedShape;
import org.apache.pekko.stream.UniformFanInShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.GraphDSL;
import org.apache.pekko.stream.javadsl.MergeSequence;
import org.apache.pekko.stream.javadsl.Partition;
import org.apache.pekko.stream.javadsl.RunnableGraph;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
Source<Message, NotUsed> subscription = createSubscription();
Flow<Pair<Message, Long>, Pair<Message, Long>, NotUsed> messageProcessor =
createMessageProcessor();
Sink<Message, NotUsed> messageAcknowledger = createMessageAcknowledger();
RunnableGraph.fromGraph(
GraphDSL.create(
builder -> {
// Partitions stream into messages that should or should not be processed
UniformFanOutShape<Pair<Message, Long>, Pair<Message, Long>> partition =
builder.add(
Partition.create(2, element -> shouldProcess(element.first()) ? 0 : 1));
// Merges stream by the index produced by zipWithIndex
UniformFanInShape<Pair<Message, Long>, Pair<Message, Long>> merge =
builder.add(MergeSequence.create(2, Pair::second));
builder.from(builder.add(subscription.zipWithIndex())).viaFanOut(partition);
// First goes through message processor
builder.from(partition.out(0)).via(builder.add(messageProcessor)).viaFanIn(merge);
// Second partition bypasses message processor
builder.from(partition.out(1)).viaFanIn(merge);
// Unwrap message index pairs and send to acknowledger
builder
.from(merge.out())
.to(
builder.add(
Flow.<Pair<Message, Long>>create()
.map(Pair::first)
.to(messageAcknowledger)));
return ClosedShape.getInstance();
}))
.run(system);
Reactive Streams semantics¶
emits when one of the upstreams has the next expected element in the sequence available.
backpressures when downstream backpressures
completes when all upstreams complete
cancels downstream cancels