Broadcast
Emit each incoming element each of n
outputs.
Signature¶
Description¶
Emit each incoming element each of n
outputs.
Example¶
Here is an example that is using Broadcast
to aggregate different values from a Source
of integers.
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.ClosedShape
import pekko.stream.scaladsl.GraphDSL
import pekko.stream.scaladsl.RunnableGraph
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
val source: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)
val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1)
val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem))
val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem))
val (count: Future[Int], min: Future[Int], max: Future[Int]) =
RunnableGraph
.fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
implicit builder => (countS, minS, maxS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](3))
source ~> broadcast
broadcast.out(0) ~> countS
broadcast.out(1) ~> minS
broadcast.out(2) ~> maxS
ClosedShape
})
.run()
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.japi.tuple.Tuple3;
import org.apache.pekko.stream.ClosedShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.javadsl.Broadcast;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.GraphDSL;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.RunnableGraph;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import java.util.concurrent.CompletionStage;
Source<Integer, NotUsed> source = Source.range(1, 10);
Sink<Integer, CompletionStage<Integer>> countSink =
Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right());
Sink<Integer, CompletionStage<Integer>> minSink =
Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right());
Sink<Integer, CompletionStage<Integer>> maxSink =
Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right());
final Tuple3<CompletionStage<Integer>, CompletionStage<Integer>, CompletionStage<Integer>>
result =
RunnableGraph.fromGraph(
GraphDSL.create3(
countSink,
minSink,
maxSink,
Tuple3::create,
(builder, countS, minS, maxS) -> {
final UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(3));
builder.from(builder.add(source)).viaFanOut(broadcast);
builder.from(broadcast.out(0)).to(countS);
builder.from(broadcast.out(1)).to(minS);
builder.from(broadcast.out(2)).to(maxS);
return ClosedShape.getInstance();
}))
.run(system);
Note that asynchronous boundary for the output streams must be added explicitly if it’s desired to run them in parallel.
sourceRunnableGraph.fromGraph(GraphDSL.createGraph(countSink.async, minSink.async, maxSink.async)(Tuple3.apply) {
implicit builder => (countS, minS, maxS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](3))
source ~> broadcast
broadcast.out(0) ~> countS
broadcast.out(1) ~> minS
broadcast.out(2) ~> maxS
ClosedShape
})
sourceRunnableGraph.fromGraph(
GraphDSL.create3(
countSink.async(),
minSink.async(),
maxSink.async(),
Tuple3::create,
(builder, countS, minS, maxS) -> {
final UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(3));
builder.from(builder.add(source)).viaFanOut(broadcast);
builder.from(broadcast.out(0)).to(countS);
builder.from(broadcast.out(1)).to(minS);
builder.from(broadcast.out(2)).to(maxS);
return ClosedShape.getInstance();
}));
Reactive Streams semantics¶
emits when all of the outputs stops backpressuring and there is an input element available
backpressures when any of the outputs backpressures
completes when upstream completes
cancels depends on the eagerCancel
flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
1.1.3