Partition
Fan-out the stream to several streams.
Signature¶
Description¶
Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the partitioner function applied to the element.
Example¶
Here is an example of using Partition
to split a Source
of integers to one Sink
for the even numbers and another Sink
for the odd numbers.
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.Attributes
import pekko.stream.Attributes.LogLevels
import pekko.stream.ClosedShape
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.GraphDSL
import pekko.stream.scaladsl.Partition
import pekko.stream.scaladsl.RunnableGraph
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
val source: Source[Int, NotUsed] = Source(1 to 10)
val even: Sink[Int, NotUsed] =
Flow[Int].log("even").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore)
val odd: Sink[Int, NotUsed] =
Flow[Int].log("odd").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore)
RunnableGraph
.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(Partition[Int](2, element => if (element % 2 == 0) 0 else 1))
source ~> partition.in
partition.out(0) ~> even
partition.out(1) ~> odd
ClosedShape
})
.run()
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.ClosedShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.GraphDSL;
import org.apache.pekko.stream.javadsl.Partition;
import org.apache.pekko.stream.javadsl.RunnableGraph;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
Source<Integer, NotUsed> source = Source.range(1, 10);
Sink<Integer, NotUsed> even =
Flow.of(Integer.class)
.log("even")
.withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo()))
.to(Sink.ignore());
Sink<Integer, NotUsed> odd =
Flow.of(Integer.class)
.log("odd")
.withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo()))
.to(Sink.ignore());
RunnableGraph.fromGraph(
GraphDSL.create(
builder -> {
UniformFanOutShape<Integer, Integer> partition =
builder.add(
Partition.create(
Integer.class, 2, element -> (element % 2 == 0) ? 0 : 1));
builder.from(builder.add(source)).viaFanOut(partition);
builder.from(partition.out(0)).to(builder.add(even));
builder.from(partition.out(1)).to(builder.add(odd));
return ClosedShape.getInstance();
}))
.run(system);
Reactive Streams semantics¶
emits when the chosen output stops backpressuring and there is an input element available
backpressures when the chosen output backpressures
completes when upstream completes and no output is pending
cancels depends on the eagerCancel
flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
1.1.3