Broadcast
Emit each incoming element each of n
outputs.
Signature
Description
Emit each incoming element each of n
outputs.
Example
Here is an example that is using Broadcast
to aggregate different values from a Source
of integers.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.stream.ClosedShape import pekko.stream.scaladsl.GraphDSL import pekko.stream.scaladsl.RunnableGraph import pekko.stream.scaladsl.Sink import pekko.stream.scaladsl.Source val source: Source[Int, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100) val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1) val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem)) val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem)) val (count: Future[Int], min: Future[Int], max: Future[Int]) = RunnableGraph .fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) { implicit builder => (countS, minS, maxS) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[Int](3)) source ~> broadcast broadcast.out(0) ~> countS broadcast.out(1) ~> minS broadcast.out(2) ~> maxS ClosedShape }) .run()
- Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.japi.tuple.Tuple3; import org.apache.pekko.stream.ClosedShape; import org.apache.pekko.stream.UniformFanOutShape; import org.apache.pekko.stream.javadsl.Broadcast; import org.apache.pekko.stream.javadsl.Flow; import org.apache.pekko.stream.javadsl.GraphDSL; import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.RunnableGraph; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import java.util.concurrent.CompletionStage; Source<Integer, NotUsed> source = Source.range(1, 10); Sink<Integer, CompletionStage<Integer>> countSink = Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right()); Sink<Integer, CompletionStage<Integer>> minSink = Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right()); Sink<Integer, CompletionStage<Integer>> maxSink = Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right()); final Tuple3<CompletionStage<Integer>, CompletionStage<Integer>, CompletionStage<Integer>> result = RunnableGraph.fromGraph( GraphDSL.create3( countSink, minSink, maxSink, Tuple3::create, (builder, countS, minS, maxS) -> { final UniformFanOutShape<Integer, Integer> broadcast = builder.add(Broadcast.create(3)); builder.from(builder.add(source)).viaFanOut(broadcast); builder.from(broadcast.out(0)).to(countS); builder.from(broadcast.out(1)).to(minS); builder.from(broadcast.out(2)).to(maxS); return ClosedShape.getInstance(); })) .run(system);
Note that asynchronous boundary for the output streams must be added explicitly if it’s desired to run them in parallel.
- Scala
-
source
RunnableGraph.fromGraph(GraphDSL.createGraph(countSink.async, minSink.async, maxSink.async)(Tuple3.apply) { implicit builder => (countS, minS, maxS) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[Int](3)) source ~> broadcast broadcast.out(0) ~> countS broadcast.out(1) ~> minS broadcast.out(2) ~> maxS ClosedShape })
- Java
-
source
RunnableGraph.fromGraph( GraphDSL.create3( countSink.async(), minSink.async(), maxSink.async(), Tuple3::create, (builder, countS, minS, maxS) -> { final UniformFanOutShape<Integer, Integer> broadcast = builder.add(Broadcast.create(3)); builder.from(builder.add(source)).viaFanOut(broadcast); builder.from(broadcast.out(0)).to(countS); builder.from(broadcast.out(1)).to(minS); builder.from(broadcast.out(2)).to(maxS); return ClosedShape.getInstance(); }));
Reactive Streams semantics
emits when all of the outputs stops backpressuring and there is an input element available
backpressures when any of the outputs backpressures
completes when upstream completes
cancels depends on the eagerCancel
flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
1.0.3