Dynamic stream handling
Dependency
To use Pekko Streams, add the module to your project:
- sbt
val PekkoVersion = "1.1.2" libraryDependencies += "org.apache.pekko" %% "pekko-stream" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2") implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}" }
Introduction
Controlling stream completion with KillSwitch
A KillSwitch
KillSwitch
allows the completion of operators of FlowShape
FlowShape
from the outside. It consists of a flow element that can be linked to an operator of FlowShape
needing completion control. The KillSwitch
trait interface allows to:
- complete the stream(s) via
shutdown()
shutdown()
- fail the stream(s) via
abort(Throwable error)
abort(Throwable error)
- Scala
-
source
trait KillSwitch { /** * After calling [[KillSwitch#shutdown]] the linked [[Graph]]s of [[FlowShape]] are completed normally. */ def shutdown(): Unit /** * After calling [[KillSwitch#abort]] the linked [[Graph]]s of [[FlowShape]] are failed. */ def abort(ex: Throwable): Unit }
After the first call to either shutdown
or abort
, all subsequent calls to any of these methods will be ignored. Stream completion is performed by both
- cancelling its upstream.
- completing (in case of
shutdown
) or failing (in case ofabort
) its downstream
A KillSwitch
can control the completion of one or multiple streams, and therefore comes in two different flavours.
UniqueKillSwitch
UniqueKillSwitch
UniqueKillSwitch
allows to control the completion of one materialized Graph
Graph
of FlowShape
FlowShape
. Refer to the below for usage examples.
- Shutdown
- Scala
-
source
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) val lastSnk = Sink.last[Int] val (killSwitch, last) = countingSrc .viaMat(KillSwitches.single)(Keep.right) .toMat(lastSnk)(Keep.both) .run() doSomethingElse() killSwitch.shutdown() Await.result(last, 1.second) shouldBe 2
- Java
-
source
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc .viaMat(KillSwitches.single(), Keep.right()) .toMat(lastSnk, Keep.both()) .run(system); final UniqueKillSwitch killSwitch = stream.first(); final CompletionStage<Integer> completionStage = stream.second(); doSomethingElse(); killSwitch.shutdown(); final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals(2, finalCount);
- Abort
- Scala
-
source
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) val lastSnk = Sink.last[Int] val (killSwitch, last) = countingSrc .viaMat(KillSwitches.single)(Keep.right) .toMat(lastSnk)(Keep.both).run() val error = new RuntimeException("boom!") killSwitch.abort(error) Await.result(last.failed, 1.second) shouldBe error
- Java
-
source
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc .viaMat(KillSwitches.single(), Keep.right()) .toMat(lastSnk, Keep.both()) .run(system); final UniqueKillSwitch killSwitch = stream.first(); final CompletionStage<Integer> completionStage = stream.second(); final Exception error = new Exception("boom!"); killSwitch.abort(error); final int result = completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); assertEquals(-1, result);
SharedKillSwitch
A SharedKillSwitch
SharedKillSwitch
allows to control the completion of an arbitrary number operators of FlowShape
FlowShape
. It can be materialized multiple times via its flow
flow
method, and all materialized operators linked to it are controlled by the switch. Refer to the below for usage examples.
- Shutdown
- Scala
-
source
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) val lastSnk = Sink.last[Int] val sharedKillSwitch = KillSwitches.shared("my-kill-switch") val last = countingSrc .via(sharedKillSwitch.flow) .runWith(lastSnk) val delayedLast = countingSrc .delay(1.second, DelayOverflowStrategy.backpressure) .via(sharedKillSwitch.flow) .runWith(lastSnk) doSomethingElse() sharedKillSwitch.shutdown() Await.result(last, 1.second) shouldBe 2 Await.result(delayedLast, 1.second) shouldBe 1
- Java
-
source
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); final CompletionStage<Integer> completionStage = countingSrc .viaMat(killSwitch.flow(), Keep.right()) .toMat(lastSnk, Keep.right()) .run(system); final CompletionStage<Integer> completionStageDelayed = countingSrc .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()) .viaMat(killSwitch.flow(), Keep.right()) .toMat(lastSnk, Keep.right()) .run(system); doSomethingElse(); killSwitch.shutdown(); final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); final int finalCountDelayed = completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals(2, finalCount); assertEquals(1, finalCountDelayed);
- Abort
- Scala
-
source
val countingSrc = Source(Stream.from(1)).delay(1.second) val lastSnk = Sink.last[Int] val sharedKillSwitch = KillSwitches.shared("my-kill-switch") val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk) val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk) val error = new RuntimeException("boom!") sharedKillSwitch.abort(error) Await.result(last1.failed, 1.second) shouldBe error Await.result(last2.failed, 1.second) shouldBe error
- Java
-
source
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); final CompletionStage<Integer> completionStage1 = countingSrc .viaMat(killSwitch.flow(), Keep.right()) .toMat(lastSnk, Keep.right()) .run(system); final CompletionStage<Integer> completionStage2 = countingSrc .viaMat(killSwitch.flow(), Keep.right()) .toMat(lastSnk, Keep.right()) .run(system); final Exception error = new Exception("boom!"); killSwitch.abort(error); final int result1 = completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); final int result2 = completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); assertEquals(-1, result1); assertEquals(-1, result2);
A UniqueKillSwitch
UniqueKillSwitch
is always a result of a materialization, whilst SharedKillSwitch
SharedKillSwitch
needs to be constructed before any materialization takes place.
Dynamic fan-in and fan-out with MergeHub, BroadcastHub and PartitionHub
There are many cases when consumers or producers of a certain service (represented as a Sink, Source, or possibly Flow) are dynamic and not known in advance. The Graph DSL does not allow to represent this, all connections of the graph must be known in advance and must be connected upfront. To allow dynamic fan-in and fan-out streaming, the Hubs should be used. They provide means to construct Sink
Sink
and Source
Source
pairs that are “attached” to each other, but one of them can be materialized multiple times to implement dynamic fan-in or fan-out.
Using the MergeHub
A MergeHub
MergeHub
allows to implement a dynamic fan-in junction point in a graph where elements coming from different producers are emitted in a First-Comes-First-Served fashion. If the consumer cannot keep up then all of the producers are backpressured. The hub itself comes as a Source
Source
to which the single consumer can be attached. It is not possible to attach any producers until this Source
has been materialized (started). This is ensured by the fact that we only get the corresponding Sink
Sink
as a materialized value. Usage might look like this:
- Scala
-
source
// A simple consumer that will print to the console for now val consumer = Sink.foreach(println) // Attach a MergeHub Source to the consumer. This will materialize to a // corresponding Sink. val runnableGraph: RunnableGraph[Sink[String, NotUsed]] = MergeHub.source[String](perProducerBufferSize = 16).to(consumer) // By running/materializing the consumer we get back a Sink, and hence // now have access to feed elements into it. This Sink can be materialized // any number of times, and every element that enters the Sink will // be consumed by our consumer. val toConsumer: Sink[String, NotUsed] = runnableGraph.run() // Feeding two independent sources into the hub. Source.single("Hello!").runWith(toConsumer) Source.single("Hub!").runWith(toConsumer)
- Java
-
source
// A simple consumer that will print to the console for now Sink<String, CompletionStage<Done>> consumer = Sink.foreach(System.out::println); // Attach a MergeHub Source to the consumer. This will materialize to a // corresponding Sink. RunnableGraph<Sink<String, NotUsed>> runnableGraph = MergeHub.of(String.class, 16).to(consumer); // By running/materializing the consumer we get back a Sink, and hence // now have access to feed elements into it. This Sink can be materialized // any number of times, and every element that enters the Sink will // be consumed by our consumer. Sink<String, NotUsed> toConsumer = runnableGraph.run(system); Source.single("Hello!").runWith(toConsumer, system); Source.single("Hub!").runWith(toConsumer, system);
This sequence, while might look odd at first, ensures proper startup order. Once we get the Sink
, we can use it as many times as wanted. Everything that is fed to it will be delivered to the consumer we attached previously until it cancels.
Using the BroadcastHub
A BroadcastHub
BroadcastHub
can be used to consume elements from a common producer by a dynamic set of consumers. The rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is a Sink
Sink
to which the single producer must be attached first. Consumers can only be attached once the Sink
has been materialized (i.e. the producer has been started). One example of using the BroadcastHub
:
- Scala
-
source
// A simple producer that publishes a new "message" every second val producer = Source.tick(1.second, 1.second, "New message") // Attach a BroadcastHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right) // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. val fromProducer: Source[String, NotUsed] = runnableGraph.run() // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.runForeach(msg => println("consumer2: " + msg))
- Java
-
source
// A simple producer that publishes a new "message" every second Source<String, Cancellable> producer = Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "New message"); // Attach a BroadcastHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) RunnableGraph<Source<String, NotUsed>> runnableGraph = producer.toMat(BroadcastHub.of(String.class, 256), Keep.right()); // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. Source<String, NotUsed> fromProducer = runnableGraph.run(materializer); // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
The resulting Source
Source
can be materialized any number of times, each materialization effectively attaching a new subscriber. If there are no subscribers attached to this hub then it will not drop any elements but instead backpressure the upstream producer until subscribers arrive. This behavior can be tweaked by using the operators .buffer
.buffer
for example with a drop strategy, or attaching a subscriber that drops all messages. If there are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new subscriber arrives it will adaptively slow down, ensuring no more messages are dropped.
Combining dynamic operators to build a simple Publish-Subscribe service
The features provided by the Hub implementations are limited by default. This is by design, as various combinations can be used to express additional features like unsubscribing producers or consumers externally. We show here an example that builds a Flow
Flow
representing a publish-subscribe channel. The input of the Flow
is published to all subscribers while the output streams all the elements published.
First, we connect a MergeHub
MergeHub
and a BroadcastHub
BroadcastHub
together to form a publish-subscribe channel. Once we materialize this small stream, we get back a pair of Source
Source
and Sink
Sink
that together define the publish and subscribe sides of our channel.
- Scala
-
source
// Obtain a Sink and Source which will publish and receive from the "bus" respectively. val (sink, source) = MergeHub.source[String](perProducerBufferSize = 16).toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()
- Java
-
source
// Obtain a Sink and Source which will publish and receive from the "bus" respectively. Pair<Sink<String, NotUsed>, Source<String, NotUsed>> sinkAndSource = MergeHub.of(String.class, 16) .toMat(BroadcastHub.of(String.class, 256), Keep.both()) .run(system); Sink<String, NotUsed> sink = sinkAndSource.first(); Source<String, NotUsed> source = sinkAndSource.second();
We now use a few tricks to add more features. First of all, we attach a Sink.ignore
Sink.ignore
at the broadcast side of the channel to keep it drained when there are no subscribers. If this behavior is not the desired one this line can be dropped.
- Scala
-
source
// Ensure that the Broadcast output is dropped if there are no listening parties. // If this dropping Sink is not attached, then the broadcast hub will not drop any // elements itself when there are no subscribers, backpressuring the producer instead. source.runWith(Sink.ignore)
- Java
-
source
// Ensure that the Broadcast output is dropped if there are no listening parties. // If this dropping Sink is not attached, then the broadcast hub will not drop any // elements itself when there are no subscribers, backpressuring the producer instead. source.runWith(Sink.ignore(), system);
We now wrap the Sink
Sink
and Source
Source
in a Flow
Flow
using Flow.fromSinkAndSource
Flow.fromSinkAndSource
. This bundles up the two sides of the channel into one and forces users of it to always define a publisher and subscriber side (even if the subscriber side is dropping). It also allows us to attach a KillSwitch
KillSwitch
as a BidiStage
which in turn makes it possible to close both the original Sink
and Source
at the same time. Finally, we add backpressureTimeout
on the consumer side to ensure that subscribers that block the channel for more than 3 seconds are forcefully removed (and their stream failed).
- Scala
-
source
// We create now a Flow that represents a publish-subscribe channel using the above // started stream as its "topic". We add two more features, external cancellation of // the registration and automatic cleanup for very slow subscribers. val busFlow: Flow[String, String, UniqueKillSwitch] = Flow .fromSinkAndSource(sink, source) .joinMat(KillSwitches.singleBidi[String, String])(Keep.right) .backpressureTimeout(3.seconds)
- Java
-
source
// We create now a Flow that represents a publish-subscribe channel using the above // started stream as its "topic". We add two more features, external cancellation of // the registration and automatic cleanup for very slow subscribers. Flow<String, String, UniqueKillSwitch> busFlow = Flow.fromSinkAndSource(sink, source) .joinMat(KillSwitches.singleBidi(), Keep.right()) .backpressureTimeout(Duration.ofSeconds(1));
The resulting Flow now has a type of Flow[String, String, UniqueKillSwitch]
representing a publish-subscribe channel which can be used any number of times to attach new producers or consumers. In addition, it materializes to a UniqueKillSwitch
(see UniqueKillSwitch) that can be used to deregister a single user externally:
- Scala
-
source
val switch: UniqueKillSwitch = Source.repeat("Hello world!").viaMat(busFlow)(Keep.right).to(Sink.foreach(println)).run() // Shut down externally switch.shutdown()
- Java
-
source
UniqueKillSwitch killSwitch = Source.repeat("Hello World!") .viaMat(busFlow, Keep.right()) .to(Sink.foreach(System.out::println)) .run(system); // Shut down externally killSwitch.shutdown();
Using the PartitionHub
This is a may change feature*
A PartitionHub
PartitionHub
can be used to route elements from a common producer to a dynamic set of consumers. The selection of consumer is done with a function. Each element can be routed to only one consumer.
The rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is a Sink
Sink
to which the single producer must be attached first. Consumers can only be attached once the Sink
has been materialized (i.e. the producer has been started). One example of using the PartitionHub
:
- Scala
-
source
// A simple producer that publishes a new "message-" every second val producer = Source.tick(1.second, 1.second, "message").zipWith(Source(1 to 100))((a, b) => s"$a-$b") // Attach a PartitionHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat( PartitionHub.sink( (size, elem) => math.abs(elem.hashCode % size), startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right) // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. val fromProducer: Source[String, NotUsed] = runnableGraph.run() // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.runForeach(msg => println("consumer2: " + msg))
- Java
-
source
// A simple producer that publishes a new "message-n" every second Source<String, Cancellable> producer = Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "message") .zipWith(Source.range(0, 100), (a, b) -> a + "-" + b); // Attach a PartitionHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) RunnableGraph<Source<String, NotUsed>> runnableGraph = producer.toMat( PartitionHub.of(String.class, (size, elem) -> Math.abs(elem.hashCode() % size), 2, 256), Keep.right()); // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. Source<String, NotUsed> fromProducer = runnableGraph.run(materializer); // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
The partitioner
function takes two parameters; the first is the number of active consumers and the second is the stream element. The function should return the index of the selected consumer for the given element, i.e. int
greater than or equal to 0 and less than number of consumers.
The resulting Source
Source
can be materialized any number of times, each materialization effectively attaching a new consumer. If there are no consumers attached to this hub then it will not drop any elements but instead backpressure the upstream producer until consumers arrive. This behavior can be tweaked by using an operator, for example .buffer
.buffer
with a drop strategy, or attaching a consumer that drops all messages. If there are no other consumers, this will ensure that the producer is kept drained (dropping all elements) and once a new consumer arrives and messages are routed to the new consumer it will adaptively slow down, ensuring no more messages are dropped.
It is possible to define how many initial consumers that are required before it starts emitting any messages to the attached consumers. While not enough consumers have been attached messages are buffered and when the buffer is full the upstream producer is backpressured. No messages are dropped.
The above example illustrate a stateless partition function. For more advanced stateful routing the ofStateful
statefulSink
can be used. Here is an example of a stateful round-robin function:
- Scala
-
source
// A simple producer that publishes a new "message-" every second val producer = Source.tick(1.second, 1.second, "message").zipWith(Source(1 to 100))((a, b) => s"$a-$b") // New instance of the partitioner function and its state is created // for each materialization of the PartitionHub. def roundRobin(): (PartitionHub.ConsumerInfo, String) => Long = { var i = -1L (info, elem) => { i += 1 info.consumerIdByIdx((i % info.size).toInt) } } // Attach a PartitionHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat(PartitionHub.statefulSink(() => roundRobin(), startAfterNrOfConsumers = 2, bufferSize = 256))( Keep.right) // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. val fromProducer: Source[String, NotUsed] = runnableGraph.run() // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.runForeach(msg => println("consumer2: " + msg))
- Java
-
source
// A simple producer that publishes a new "message-n" every second Source<String, Cancellable> producer = Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "message") .zipWith(Source.range(0, 100), (a, b) -> a + "-" + b); // Attach a PartitionHub Sink to the producer. This will materialize to a // corresponding Source. // (We need to use toMat and Keep.right since by default the materialized // value to the left is used) RunnableGraph<Source<String, NotUsed>> runnableGraph = producer.toMat( PartitionHub.ofStateful(String.class, () -> new RoundRobin<String>(), 2, 256), Keep.right()); // By running/materializing the producer, we get back a Source, which // gives us access to the elements published by the producer. Source<String, NotUsed> fromProducer = runnableGraph.run(materializer); // Print out messages from the producer in two independent consumers fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
Note that it is a factory of a function to be able to hold stateful variables that are unique for each materialization. In this example the partitioner
function is implemented as a class to be able to hold the mutable variable. A new instance of RoundRobin
is created for each materialization of the hub.
source// Using a class since variable must otherwise be final.
// New instance is created for each materialization of the PartitionHub.
static class RoundRobin<T> implements ToLongBiFunction<ConsumerInfo, T> {
private long i = -1;
@Override
public long applyAsLong(ConsumerInfo info, T elem) {
i++;
return info.consumerIdByIdx((int) (i % info.size()));
}
}
The function takes two parameters; the first is information about active consumers, including an array of consumer identifiers and the second is the stream element. The function should return the selected consumer identifier for the given element. The function will never be called when there are no active consumers, i.e. there is always at least one element in the array of identifiers.
Another interesting type of routing is to prefer routing to the fastest consumers. The ConsumerInfo
ConsumerInfo
has an accessor queueSize
that is approximate number of buffered elements for a consumer. Larger value than other consumers could be an indication of that the consumer is slow. Note that this is a moving target since the elements are consumed concurrently. Here is an example of a hub that routes to the consumer with least buffered elements:
- Scala
-
source
val producer = Source(0 until 100) // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. // Note that this is a moving target since the elements are consumed concurrently. val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = producer.toMat( PartitionHub.statefulSink( () => (info, elem) => info.consumerIds.minBy(id => info.queueSize(id)), startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) val fromProducer: Source[Int, NotUsed] = runnableGraph.run() fromProducer.runForeach(msg => println("consumer1: " + msg)) fromProducer.throttle(10, 100.millis).runForeach(msg => println("consumer2: " + msg))
- Java
-
source
Source<Integer, NotUsed> producer = Source.range(0, 100); // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. // Note that this is a moving target since the elements are consumed concurrently. RunnableGraph<Source<Integer, NotUsed>> runnableGraph = producer.toMat( PartitionHub.ofStateful( Integer.class, () -> (info, elem) -> { final List<Object> ids = info.getConsumerIds(); int minValue = info.queueSize(0); long fastest = info.consumerIdByIdx(0); for (int i = 1; i < ids.size(); i++) { int value = info.queueSize(i); if (value < minValue) { minValue = value; fastest = info.consumerIdByIdx(i); } } return fastest; }, 2, 8), Keep.right()); Source<Integer, NotUsed> fromProducer = runnableGraph.run(materializer); fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); fromProducer .throttle(10, Duration.ofMillis(100)) .runForeach(msg -> System.out.println("consumer2: " + msg), materializer);