Partition
Fan-out the stream to several streams.
Signature
Description
Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the partitioner function applied to the element.
Example
Here is an example of using Partition to split a Source of integers to one Sink for the even numbers and another Sink for the odd numbers.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.Attributes import pekko.stream.Attributes.LogLevels import pekko.stream.ClosedShape import pekko.stream.scaladsl.Flow import pekko.stream.scaladsl.GraphDSL import pekko.stream.scaladsl.Partition import pekko.stream.scaladsl.RunnableGraph import pekko.stream.scaladsl.Sink import pekko.stream.scaladsl.Source val source: Source[Int, NotUsed] = Source(1 to 10) val even: Sink[Int, NotUsed] = Flow[Int].log("even").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore) val odd: Sink[Int, NotUsed] = Flow[Int].log("odd").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore) RunnableGraph .fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val partition = builder.add(Partition[Int](2, element => if (element % 2 == 0) 0 else 1)) source ~> partition.in partition.out(0) ~> even partition.out(1) ~> odd ClosedShape }) .run() - Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.stream.Attributes; import org.apache.pekko.stream.ClosedShape; import org.apache.pekko.stream.UniformFanOutShape; import org.apache.pekko.stream.javadsl.Flow; import org.apache.pekko.stream.javadsl.GraphDSL; import org.apache.pekko.stream.javadsl.Partition; import org.apache.pekko.stream.javadsl.RunnableGraph; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; Source<Integer, NotUsed> source = Source.range(1, 10); Sink<Integer, NotUsed> even = Flow.of(Integer.class) .log("even") .withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo())) .to(Sink.ignore()); Sink<Integer, NotUsed> odd = Flow.of(Integer.class) .log("odd") .withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo())) .to(Sink.ignore()); RunnableGraph.fromGraph( GraphDSL.create( builder -> { UniformFanOutShape<Integer, Integer> partition = builder.add( Partition.create( Integer.class, 2, element -> (element % 2 == 0) ? 0 : 1)); builder.from(builder.add(source)).viaFanOut(partition); builder.from(partition.out(0)).to(builder.add(even)); builder.from(partition.out(1)).to(builder.add(odd)); return ClosedShape.getInstance(); })) .run(system);
Reactive Streams semantics
emits when the chosen output stops backpressuring and there is an input element available
backpressures when the chosen output backpressures
completes when upstream completes and no output is pending
cancels depends on the eagerCancel flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
1.0.0