limitWeighted
Limit the total weight of incoming elements
Signature¶
Description¶
A weight function returns the weight of each element, then the total accumulated weight is compared to a max and if it has passed the max the stream is failed with a StreamLimitReachedException
.
See also limit which puts a limit on the number of elements instead (the same as always returning 1
from the weight function).
Examples¶
limitWeighted
can protect a stream coming from an untrusted source into an in-memory aggregate that grows with the number of elements from filling the heap and causing an out-of-memory error. In this sample we use the number of bytes in each ByteString
element as weight and accept at most a total of 10 000 bytes from the untrusted source elements into the aggregated ByteString
of all bytes, if the untrusted source emits more elements the stream and the materialized Future[ByteString]
will be failed:
sourceval untrustedSource: Source[ByteString, NotUsed] = Source.repeat(ByteString("element"))
val allBytes: Future[ByteString] =
untrustedSource.limitWeighted(max = 10000)(_.length).runReduce(_ ++ _)
sourceSource<ByteString, NotUsed> untrustedSource = Source.repeat(ByteString.fromString("element"));
CompletionStage<ByteString> allBytes =
untrustedSource
.limitWeighted(
10000, // max bytes
bytes -> (long) bytes.length() // bytes of each chunk
)
.runReduce(ByteString::concat, system);
Reactive Streams semantics¶
emits when upstream emits and the number of emitted elements has not reached max
backpressures when downstream backpressures
completes when upstream completes and the number of emitted elements has not reached max