Broadcast

Emit each incoming element each of n outputs.

Fan-out operators

Signature

BroadcastBroadcast

Description

Emit each incoming element each of n outputs.

Example

Here is an example that is using Broadcast to aggregate different values from a Source of integers.

Scala
sourceimport org.apache.pekko
import pekko.NotUsed
import pekko.stream.ClosedShape
import pekko.stream.scaladsl.GraphDSL
import pekko.stream.scaladsl.RunnableGraph
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source

val source: Source[Int, NotUsed] =
  Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)

val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1)
val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem))
val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem))

val (count: Future[Int], min: Future[Int], max: Future[Int]) =
  RunnableGraph
    .fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
      implicit builder => (countS, minS, maxS) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](3))
        source           ~> broadcast
        broadcast.out(0) ~> countS
        broadcast.out(1) ~> minS
        broadcast.out(2) ~> maxS
        ClosedShape
    })
    .run()
Java
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.japi.tuple.Tuple3;
import org.apache.pekko.stream.ClosedShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.javadsl.Broadcast;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.GraphDSL;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.RunnableGraph;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import java.util.concurrent.CompletionStage;

Source<Integer, NotUsed> source = Source.range(1, 10);

Sink<Integer, CompletionStage<Integer>> countSink =
    Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right());
Sink<Integer, CompletionStage<Integer>> minSink =
    Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right());
Sink<Integer, CompletionStage<Integer>> maxSink =
    Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right());

final Tuple3<CompletionStage<Integer>, CompletionStage<Integer>, CompletionStage<Integer>>
    result =
        RunnableGraph.fromGraph(
                GraphDSL.create3(
                    countSink,
                    minSink,
                    maxSink,
                    Tuple3::create,
                    (builder, countS, minS, maxS) -> {
                      final UniformFanOutShape<Integer, Integer> broadcast =
                          builder.add(Broadcast.create(3));
                      builder.from(builder.add(source)).viaFanOut(broadcast);
                      builder.from(broadcast.out(0)).to(countS);
                      builder.from(broadcast.out(1)).to(minS);
                      builder.from(broadcast.out(2)).to(maxS);
                      return ClosedShape.getInstance();
                    }))
            .run(system);

Note that asynchronous boundary for the output streams must be added explicitly if it’s desired to run them in parallel.

Scala
sourceRunnableGraph.fromGraph(GraphDSL.createGraph(countSink.async, minSink.async, maxSink.async)(Tuple3.apply) {
  implicit builder => (countS, minS, maxS) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](3))
    source           ~> broadcast
    broadcast.out(0) ~> countS
    broadcast.out(1) ~> minS
    broadcast.out(2) ~> maxS
    ClosedShape
})
Java
sourceRunnableGraph.fromGraph(
    GraphDSL.create3(
        countSink.async(),
        minSink.async(),
        maxSink.async(),
        Tuple3::create,
        (builder, countS, minS, maxS) -> {
          final UniformFanOutShape<Integer, Integer> broadcast =
              builder.add(Broadcast.create(3));
          builder.from(builder.add(source)).viaFanOut(broadcast);

          builder.from(broadcast.out(0)).to(countS);
          builder.from(broadcast.out(1)).to(minS);
          builder.from(broadcast.out(2)).to(maxS);
          return ClosedShape.getInstance();
        }));

Reactive Streams semantics

emits when all of the outputs stops backpressuring and there is an input element available

backpressures when any of the outputs backpressures

completes when upstream completes

cancels depends on the eagerCancel flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.