class SubSource[Out, Mat] extends AnyRef
A “stream of streams” sub-flow of data elements, e.g. produced by groupBy
.
SubFlows cannot contribute to the super-flow’s materialized value since they
are materialized later, during the runtime of the flow graph processing.
- Source
- SubSource.scala
- Alphabetic
- By Inheritance
- SubSource
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new SubSource(delegate: scaladsl.SubFlow[Out, Mat, [+O]scaladsl.Source[O, Mat], scaladsl.RunnableGraph[Mat]])
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- def +(other: String): String
- def ->[B](y: B): (SubSource[Out, Mat], B)
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def addAttributes(attr: Attributes): SubSource[Out, Mat]
Add the given attributes to this Source.
Add the given attributes to this Source. Further calls to
withAttributes
will not remove these attributes. Note that this operation has no effect on an empty Flow (because the attributes apply only to the contained processing operators). - def aggregateWithBoundary[Agg, Emit](allocate: Supplier[Agg])(aggregate: Function2[Agg, Out, Pair[Agg, Boolean]], harvest: Function[Agg, Emit], emitOnTimer: Pair[Predicate[Agg], Duration]): SubSource[Emit, Mat]
Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream when custom condition is met which can be triggered by aggregate or timer.
Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream when custom condition is met which can be triggered by aggregate or timer. It can be thought of a more general groupedWeightedWithin.
Emits when the aggregation function decides the aggregate is complete or the timer function returns true
Backpressures when downstream backpressures and the aggregate is complete
Completes when upstream completes and the last aggregate has been emitted downstream
Cancels when downstream cancels
- allocate
allocate the initial data structure for aggregated elements
- aggregate
update the aggregated elements, return true if ready to emit after update.
- harvest
this is invoked before emit within the current stage/operator
- emitOnTimer
decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
- Annotations
- @ApiMayChange()
- def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat]
Attaches the given Sink to this Source, meaning that elements that passes through will also be sent to the Sink.
Attaches the given Sink to this Source, meaning that elements that passes through will also be sent to the Sink.
It is similar to #wireTap but will backpressure instead of dropping elements when the given Sink is not ready.
Emits when element is available and demand exists both from the Sink and the downstream.
Backpressures when downstream or Sink backpressures
Completes when upstream completes
Cancels when downstream or Sink cancels
- def alsoToAll(those: Graph[SinkShape[Out], _]*): SubSource[Out, Mat]
Attaches the given Sinks to this Source, meaning that elements that passes through will also be sent to all those Sinks.
Attaches the given Sinks to this Source, meaning that elements that passes through will also be sent to all those Sinks.
It is similar to #wireTap but will backpressure instead of dropping elements when the given Sinks is not ready.
Emits when element is available and demand exists both from the Sinks and the downstream.
Backpressures when downstream or any of the Sinks backpressures
Completes when upstream completes
Cancels when downstream or any of the Sinks cancels
- Annotations
- @varargs() @SafeVarargs()
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def asScala: scaladsl.SubFlow[Out, Mat, [+O]scaladsl.Source[O, Mat], scaladsl.RunnableGraph[Mat]]
Converts this Flow to its Scala DSL counterpart
- def async: SubSource[Out, Mat]
Put an asynchronous boundary around this
SubSource
- def backpressureTimeout(timeout: Duration): SubSource[Out, Mat]
If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException. The timeout is checked periodically, so the resolution of the check is one period (equals to timeout value).
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses between element emission and downstream demand.
Cancels when downstream cancels
- Annotations
- @nowarn()
- def batch[S](max: Long, seed: Function[Out, S], aggregate: Function2[S, Out, S]): SubSource[S, Mat]
Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them.
Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them. For example a batch step might store received elements in an array up to the allowed max limit if the upstream publisher is faster.
This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when downstream stops backpressuring and there is an aggregated element available
Backpressures when there are
max
batched elements and 1 pending element and downstream backpressuresCompletes when upstream completes and there is no batched/pending element waiting
Cancels when downstream cancels
See also SubSource.conflate, SubSource.batchWeighted
- max
maximum number of elements to batch before backpressuring upstream (must be positive non-zero)
- seed
Provides the first state for a batched value using the first unconsumed element as a start
- aggregate
Takes the currently batched value and the current pending element to produce a new aggregate
- def batchWeighted[S](max: Long, costFn: Function[Out, Long], seed: Function[Out, S], aggregate: Function2[S, Out, S]): SubSource[S, Mat]
Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them.
Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them. For example a batch step might concatenate
ByteString
elements up to the allowed max limit if the upstream publisher is faster.This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
Batching will apply for all elements, even if a single element cost is greater than the total allowed limit. In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after being applied with the
seed
function) without batching further elements with it, and then the rest of the incoming elements are batched.Emits when downstream stops backpressuring and there is a batched element available
Backpressures when there are
max
weighted batched elements + 1 pending element and downstream backpressuresCompletes when upstream completes and there is no batched/pending element waiting
Cancels when downstream cancels
See also SubSource.conflate, SubSource.batch
- max
maximum weight of elements to batch before backpressuring upstream (must be positive non-zero)
- costFn
a function to compute a single element weight
- seed
Provides the first state for a batched value using the first unconsumed element as a start
- aggregate
Takes the currently batched value and the current pending element to produce a new batch
- def buffer(size: Int, overflowStrategy: OverflowStrategy): SubSource[Out, Mat]
Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. Depending on the defined pekko.stream.OverflowStrategy it might drop elements or backpressure the upstream if there is no space available
Emits when downstream stops backpressuring and there is a pending element in the buffer
Backpressures when downstream backpressures or depending on OverflowStrategy:
- Backpressure - backpressures when buffer is full
- DropHead, DropTail, DropBuffer - never backpressures
- Fail - fails the stream if buffer gets full
Completes when upstream completes and buffered elements has been drained
Cancels when downstream cancels
- size
The size of the buffer in element count
- overflowStrategy
Strategy that is used when incoming elements cannot fit inside the buffer
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
- def collect[T](pf: PartialFunction[Out, T]): SubSource[T, Mat]
Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step.
Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step. Non-matching elements are filtered out.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the provided partial function is defined for the element
Backpressures when the partial function is defined for the element and downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def collectType[T](clazz: Class[T]): SubSource[T, Mat]
Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.
Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step. Non-matching elements are filtered out.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the element is an instance of the provided type
Backpressures when the element is an instance of the provided type and downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def completionTimeout(timeout: Duration): SubSource[Out, Mat]
If the completion of the stream does not happen until the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the completion of the stream does not happen until the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses before upstream completes
Cancels when downstream cancels
- Annotations
- @nowarn()
- def concat[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat]
Concatenate the given Source to this Flow, meaning that once this Flow’s input is exhausted and all result elements have been generated, the Source’s elements will be produced.
Concatenate the given Source to this Flow, meaning that once this Flow’s input is exhausted and all result elements have been generated, the Source’s elements will be produced.
Note that the Source is materialized together with this Flow and is "detached" meaning it will in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start (so it can not be combined with
Source.lazy
to defer materialization ofthat
).The second source is then kept from producing elements by asserting back-pressure until its time comes.
When needing a concat operator that is not detached use #concatLazyMat
Emits when element is available from current stream or from the given Source when current is completed
Backpressures when downstream backpressures
Completes when given Source completes
Cancels when downstream cancels
- def concatAllLazy(those: Graph[SourceShape[Out], _]*): SubSource[Out, Mat]
Concatenate the given Sources to this Source, meaning that once this Flow’s input is exhausted and all result elements have been generated, the Source’s elements will be produced.
Concatenate the given Sources to this Source, meaning that once this Flow’s input is exhausted and all result elements have been generated, the Source’s elements will be produced.
Note that the Sources is materialized together with this Flow. If
lazy
materialization is what is needed the operator can be combined with for exampleSource.lazySource
to defer materialization ofthat
until the time when this source completes.The second source is then kept from producing elements by asserting back-pressure until its time comes.
For a concat operator that is detached, use #concat
If this Source gets upstream error - no elements from the given Sources will be pulled.
Emits when element is available from current stream or from the given Sources when current is completed
Backpressures when downstream backpressures
Completes when all the given Sources completes
Cancels when downstream cancels
- Annotations
- @varargs() @SafeVarargs()
- def concatLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat]
Concatenate the given Source to this Flow, meaning that once this Flow’s input is exhausted and all result elements have been generated, the Source’s elements will be produced.
Concatenate the given Source to this Flow, meaning that once this Flow’s input is exhausted and all result elements have been generated, the Source’s elements will be produced.
Note that the Source is materialized together with this Flow. If
lazy
materialization is what is needed the operator can be combined with for exampleSource.lazySource
to defer materialization ofthat
until the time when this source completes.The second source is then kept from producing elements by asserting back-pressure until its time comes.
For a concat operator that is detached, use #concat
If this Flow gets upstream error - no elements from the given Source will be pulled.
Emits when element is available from current stream or from the given Source when current is completed
Backpressures when downstream backpressures
Completes when given Source completes
Cancels when downstream cancels
- def concatSubstreams(): Source[Out, Mat]
Flatten the sub-flows back into the super-source by concatenating them.
Flatten the sub-flows back into the super-source by concatenating them. This is usually a bad idea when combined with
groupBy
since it can easily lead to deadlock—the concatenation does not consume from the second substream until the first has finished and thegroupBy
operator will get back-pressure from the second stream.This is identical in effect to
mergeSubstreamsWithParallelism(1)
. - def conflate(aggregate: Function2[Out, Out, Out]): SubSource[Out, Mat]
Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them.
Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the upstream publisher is faster.
This version of conflate does not change the output type of the stream. See SubSource.conflateWithSeed for a more flexible version that can take a seed function and transform elements while rolling up.
This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when downstream stops backpressuring and there is a conflated element available
Backpressures when never
Completes when upstream completes
Cancels when downstream cancels
see also SubSource.conflateWithSeed SubSource.batch SubSource.batchWeighted
- aggregate
Takes the currently aggregated value and the current pending element to produce a new aggregate
- def conflateWithSeed[S](seed: Function[Out, S], aggregate: Function2[S, Out, S]): SubSource[S, Mat]
Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them.
Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the upstream publisher is faster.
This version of conflate allows to derive a seed from the first element and change the aggregated type to be different than the input type. See Flow.conflate for a simpler version that does not change types.
This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when downstream stops backpressuring and there is a conflated element available
Backpressures when never
Completes when upstream completes
Cancels when downstream cancels
see also SubSource.conflate SubSource.batch SubSource.batchWeighted
- seed
Provides the first state for a conflated value using the first unconsumed element as a start
- aggregate
Takes the currently aggregated value and the current pending element to produce a new aggregate
- def delay(of: Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat]
Shifts elements emission in time by a specified amount.
Shifts elements emission in time by a specified amount. It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined pekko.stream.DelayOverflowStrategy it might drop elements or backpressure the upstream if there is no space available in the buffer.
Delay precision is 10ms to avoid unnecessary timer scheduling cycles
Internal buffer has default capacity 16. You can set buffer size by calling
withAttributes(inputBuffer)
Emits when there is a pending element in the buffer and configured time for this element elapsed * EmitEarly - strategy do not wait to emit element if buffer is full
Backpressures when depending on OverflowStrategy * Backpressure - backpressures when buffer is full * DropHead, DropTail, DropBuffer - never backpressures * Fail - fails the stream if buffer gets full
Completes when upstream completes and buffered elements has been drained
Cancels when downstream cancels
- of
time to shift all messages
- strategy
Strategy that is used when incoming elements cannot fit inside the buffer
- Annotations
- @nowarn()
- def delayWith(delayStrategySupplier: Supplier[DelayStrategy[Out]], overFlowStrategy: DelayOverflowStrategy): SubSource[Out, Mat]
Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.
Shifts elements emission in time by an amount individually determined through delay strategy a specified amount. It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined pekko.stream.DelayOverflowStrategy it might drop elements or backpressure the upstream if there is no space available in the buffer.
It determines delay for each ongoing element invoking
DelayStrategy.nextDelay(elem: T): FiniteDuration
.Note that elements are not re-ordered: if an element is given a delay much shorter than its predecessor, it will still have to wait for the preceding element before being emitted. It is also important to notice that DelayStrategy can be stateful.
Delay precision is 10ms to avoid unnecessary timer scheduling cycles.
Internal buffer has default capacity 16. You can set buffer size by calling
addAttributes(inputBuffer)
Emits when there is a pending element in the buffer and configured time for this element elapsed * EmitEarly - strategy do not wait to emit element if buffer is full
Backpressures when depending on OverflowStrategy * Backpressure - backpressures when buffer is full * DropHead, DropTail, DropBuffer - never backpressures * Fail - fails the stream if buffer gets full
Completes when upstream completes and buffered elements have been drained
Cancels when downstream cancels
- delayStrategySupplier
creates new DelayStrategy object for each materialization
- overFlowStrategy
Strategy that is used when incoming elements cannot fit inside the buffer
- def detach: SubSource[Out, Mat]
Detaches upstream demand from downstream demand without detaching the stream rates; in other words acts like a buffer of size 1.
Detaches upstream demand from downstream demand without detaching the stream rates; in other words acts like a buffer of size 1.
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def divertTo(that: Graph[SinkShape[Out], _], when: Predicate[Out]): SubSource[Out, Mat]
Attaches the given Sink to this Flow, meaning that elements will be sent to the Sink instead of being passed through if the predicate
when
returnstrue
.Attaches the given Sink to this Flow, meaning that elements will be sent to the Sink instead of being passed through if the predicate
when
returnstrue
.Emits when emits when an element is available from the input and the chosen output has demand
Backpressures when the currently chosen output back-pressures
Completes when upstream completes and no output is pending
Cancels when any of the downstreams cancel
- def drop(n: Long): SubSource[Out, Mat]
Discard the given number of elements at the beginning of the stream.
Discard the given number of elements at the beginning of the stream. No elements will be dropped if
n
is zero or negative.Emits when the specified number of elements has been dropped already
Backpressures when the specified number of elements has been dropped and downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def dropWhile(p: Predicate[Out]): SubSource[Out, Mat]
Discard elements at the beginning of the stream while predicate is true.
Discard elements at the beginning of the stream while predicate is true. All elements will be taken after predicate returns false first time.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when predicate returned false and for all following stream elements
Backpressures when predicate returned false and downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def dropWithin(duration: Duration): SubSource[Out, Mat]
Discard the elements received within the given duration at beginning of the stream.
Discard the elements received within the given duration at beginning of the stream.
Emits when the specified time elapsed and a new upstream element arrives
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @nowarn()
- def ensuring(cond: (SubSource[Out, Mat]) => Boolean, msg: => Any): SubSource[Out, Mat]
- def ensuring(cond: (SubSource[Out, Mat]) => Boolean): SubSource[Out, Mat]
- def ensuring(cond: Boolean, msg: => Any): SubSource[Out, Mat]
- def ensuring(cond: Boolean): SubSource[Out, Mat]
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def expand[U](expander: Function[Out, Iterator[U]]): SubSource[U, Mat]
Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older element until new element comes from the upstream.
Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older element until new element comes from the upstream. For example an expand step might repeat the last element for the subscriber until it receives an update from upstream.
This element will never "drop" upstream elements as all elements go through at least one extrapolation step. This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber.
Expand does not support pekko.stream.Supervision#restart and pekko.stream.Supervision#resume. Exceptions from the
expander
function will complete the stream with failure.See also #extrapolate for a version that always preserves the original element and allows for an initial "startup" element.
Emits when downstream stops backpressuring
Backpressures when downstream backpressures or iterator runs empty
Completes when upstream completes
Cancels when downstream cancels
- expander
Takes the current extrapolation state to produce an output element and the next extrapolation state.
- See also
- def extrapolate(extrapolator: Function[Out, Iterator[Out]], initial: Out): SubSource[Out, Mat]
Allows a faster downstream to progress independent of a slower upstream.
Allows a faster downstream to progress independent of a slower upstream.
This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream signals demand.
Extrapolate does not support pekko.stream.Supervision#restart and pekko.stream.Supervision#resume. Exceptions from the
extrapolate
function will complete the stream with failure.See also #expand for a version that can overwrite the original element.
Emits when downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR
extrapolate
is non-empty and applicableBackpressures when downstream backpressures or current
extrapolate
runs emptyCompletes when upstream completes and current
extrapolate
runs emptyCancels when downstream cancels
- extrapolator
takes the current upstream element and provides a sequence of "extrapolated" elements based on the original, to be emitted in case downstream signals demand.
- initial
the initial element to be emitted, in case upstream is able to stall the entire stream.
- See also
- def extrapolate(extrapolator: Function[Out, Iterator[Out]]): SubSource[Out, Mat]
Allows a faster downstream to progress independent of a slower upstream.
Allows a faster downstream to progress independent of a slower upstream.
This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream signals demand.
Extrapolate does not support pekko.stream.Supervision#restart and pekko.stream.Supervision#resume. Exceptions from the
extrapolate
function will complete the stream with failure.See also #expand for a version that can overwrite the original element.
Emits when downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR
extrapolate
is non-empty and applicableBackpressures when downstream backpressures or current
extrapolate
runs emptyCompletes when upstream completes and current
extrapolate
runs emptyCancels when downstream cancels
- extrapolator
takes the current upstream element and provides a sequence of "extrapolated" elements based on the original, to be emitted in case downstream signals demand.
- See also
- def filter(p: Predicate[Out]): SubSource[Out, Mat]
Only pass on those elements that satisfy the given predicate.
Only pass on those elements that satisfy the given predicate.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the given predicate returns true for the element
Backpressures when the given predicate returns true for the element and downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def filterNot(p: Predicate[Out]): SubSource[Out, Mat]
Only pass on those elements that NOT satisfy the given predicate.
Only pass on those elements that NOT satisfy the given predicate.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the given predicate returns false for the element
Backpressures when the given predicate returns false for the element and downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def flatMapConcat[T, M](f: Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat]
Transform each input element into a
Source
of output elements that is then flattened into the output stream by concatenation, fully consuming one Source after the other.Transform each input element into a
Source
of output elements that is then flattened into the output stream by concatenation, fully consuming one Source after the other.Emits when a currently consumed substream has an element available
Backpressures when downstream backpressures
Completes when upstream completes and all consumed substreams complete
Cancels when downstream cancels
- def flatMapMerge[T, M](breadth: Int, f: Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat]
Transform each input element into a
Source
of output elements that is then flattened into the output stream by merging, where at mostbreadth
substreams are being consumed at any given time.Transform each input element into a
Source
of output elements that is then flattened into the output stream by merging, where at mostbreadth
substreams are being consumed at any given time.Emits when a currently consumed substream has an element available
Backpressures when downstream backpressures
Completes when upstream completes and all consumed substreams complete
Cancels when downstream cancels
- def flatMapPrefix[Out2, Mat2](n: Int, f: Function[Iterable[Out], Flow[Out, Out2, Mat2]]): SubSource[Out2, Mat]
Takes up to
n
elements from the stream (less thann
only if the upstream completes before emittingn
elements), then applyf
on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via).Takes up to
n
elements from the stream (less thann
only if the upstream completes before emittingn
elements), then applyf
on these elements in order to obtain a flow, this flow is then materialized and the rest of the input is processed by this flow (similar to via). This method returns a flow consuming the rest of the stream producing the materialized flow's output.Emits when the materialized flow emits. Notice the first
n
elements are buffered internally before materializing the flow and connecting it to the rest of the upstream - producing elements at its own discretion (might 'swallow' or multiply elements).Backpressures when downstream backpressures
Completes when the materialized flow completes. If upstream completes before producing
n
elements,f
will be applied with the provided elements, the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion.Cancels when the materialized flow cancels. Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow.
- n
the number of elements to accumulate before materializing the downstream flow.
- f
a function that produces the downstream flow based on the upstream's prefix.
- def fold[T](zero: T)(f: Function2[T, Out, T]): SubSource[T, Mat]
Similar to
scan
but only emits its result when the upstream completes, after which it also completes.Similar to
scan
but only emits its result when the upstream completes, after which it also completes. Applies the given functionf
towards its current and next value, yielding the next current value.Adheres to the ActorAttributes.SupervisionStrategy attribute.
If the function
f
throws an exception and the supervision decision is pekko.stream.Supervision#restart current value starts atzero
again the stream will continue.Note that the
zero
value must be immutable.Emits when upstream completes
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def foldAsync[T](zero: T)(f: Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat]
Similar to
fold
but with an asynchronous function.Similar to
fold
but with an asynchronous function. Applies the given function towards its current and next value, yielding the next current value.If the function
f
returns a failure and the supervision decision is pekko.stream.Supervision.Restart current value starts atzero
again the stream will continue.Note that the
zero
value must be immutable.Emits when upstream completes
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def grouped(n: Int): SubSource[List[Out], Mat]
Chunk up this stream into groups of the given size, with the last group possibly smaller than requested due to end-of-stream.
Chunk up this stream into groups of the given size, with the last group possibly smaller than requested due to end-of-stream.
n
must be positive, otherwise IllegalArgumentException is thrown.Emits when the specified number of elements has been accumulated or upstream completed
Backpressures when a group has been assembled and downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def groupedWeighted(minWeight: Long)(costFn: Function[Out, Long]): SubSource[List[Out], Mat]
Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to the
minWeight
, with the last group possibly smaller than requestedminWeight
due to end-of-stream.Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to the
minWeight
, with the last group possibly smaller than requestedminWeight
due to end-of-stream.minWeight
must be positive, otherwise IllegalArgumentException is thrown.costFn
must return a non-negative result for all inputs, otherwise the stage will fail with an IllegalArgumentException.Emits when the cumulative weight of elements is greater than or equal to the
minWeight
or upstream completedBackpressures when a buffered group weighs more than
minWeight
and downstream backpressuresCompletes when upstream completes
Cancels when downstream cancels
- def groupedWeightedWithin(maxWeight: Long, maxNumber: Int, costFn: Function[Out, Long], duration: Duration): SubSource[List[Out], Mat]
Chunk up this stream into groups of elements received within a time window, or limited by the weight and number of the elements, whatever happens first.
Chunk up this stream into groups of elements received within a time window, or limited by the weight and number of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.
Emits when the configured time elapses since the last group has been emitted or weight limit reached
Backpressures when downstream backpressures, and buffered group (+ pending element) weighs more than
maxWeight
or has more thanmaxNumber
elementsCompletes when upstream completes (emits last group)
Cancels when downstream completes
maxWeight
must be positive,maxNumber
must be positive, andduration
must be greater than 0 seconds, otherwise IllegalArgumentException is thrown. - def groupedWeightedWithin(maxWeight: Long, costFn: Function[Out, Long], duration: Duration): SubSource[List[Out], Mat]
Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first.
Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.
Emits when the configured time elapses since the last group has been emitted or weight limit reached
Backpressures when downstream backpressures, and buffered group (+ pending element) weighs more than
maxWeight
Completes when upstream completes (emits last group)
Cancels when downstream completes
maxWeight
must be positive, andduration
must be greater than 0 seconds, otherwise IllegalArgumentException is thrown.- Annotations
- @nowarn()
- def groupedWithin(maxNumber: Int, duration: Duration): SubSource[List[Out], Mat]
Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first.
Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.
Emits when the configured time elapses since the last group has been emitted or
n
elements is bufferedBackpressures when downstream backpressures, and there are
n+1
buffered elementsCompletes when upstream completes (emits last group)
Cancels when downstream completes
maxNumber
must be positive, andduration
must be greater than 0 seconds, otherwise IllegalArgumentException is thrown.- Annotations
- @nowarn()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def idleTimeout(timeout: Duration): SubSource[Out, Mat]
If the time between two processed elements exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the time between two processed elements exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException. The timeout is checked periodically, so the resolution of the check is one period (equals to timeout value).
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses between two emitted elements
Cancels when downstream cancels
- Annotations
- @nowarn()
- def initialDelay(delay: Duration): SubSource[Out, Mat]
Delays the initial element by the specified duration.
Delays the initial element by the specified duration.
Emits when upstream emits an element if the initial delay is already elapsed
Backpressures when downstream backpressures or initial delay is not yet elapsed
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @nowarn()
- def initialTimeout(timeout: Duration): SubSource[Out, Mat]
If the first element has not passed through this operator before the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the first element has not passed through this operator before the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses before first element arrives
Cancels when downstream cancels
- Annotations
- @nowarn()
- def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat]
Interleave is a deterministic merge of the given Source with elements of this Source.
Interleave is a deterministic merge of the given Source with elements of this Source. It first emits
segmentSize
number of elements from this flow to downstream, then - same amount forthat
source, then repeat process.Example:
Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2) // 1, 2, 4, 5, 3, 6, 7
After one of sources is complete than all the rest elements will be emitted from the second one
If one of sources gets upstream error - stream completes with failure.
Emits when element is available from the currently consumed upstream
Backpressures when downstream backpressures. Signal to current upstream, switch to next upstream when received
segmentSize
elementsCompletes when this Source and given one completes
Cancels when downstream cancels
- def interleaveAll(those: List[_ <: Graph[SourceShape[Out], _]], segmentSize: Int, eagerClose: Boolean): SubSource[Out, Mat]
Interleave is a deterministic merge of the given Source with elements of this Flow.
Interleave is a deterministic merge of the given Source with elements of this Flow. It first emits
segmentSize
number of elements from this flow to downstream, then - same amount forthat
source, then repeat process.If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the other upstream and complete itself.
If this Flow or Source gets upstream error - stream completes with failure.
Emits when element is available from the currently consumed upstream
Backpressures when downstream backpressures. Signal to current upstream, switch to next upstream when received
segmentSize
elementsCompletes when the Flow and given Source completes
Cancels when downstream cancels
- def intersperse(inject: Out): SubSource[Out, Mat]
Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.
Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.
Additionally can inject start and end marker elements to stream.
Examples:
Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3)); nums.intersperse(","); // 1 , 2 , 3 nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
Emits when upstream emits (or before with the
start
element if provided)Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def intersperse(start: Out, inject: Out, end: Out): SubSource[Out, Mat]
Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.
Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.
Additionally can inject start and end marker elements to stream.
Examples:
Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3)); nums.intersperse(","); // 1 , 2 , 3 nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
In case you want to only prepend or only append an element (yet still use the
intercept
feature to inject a separator between elements, you may want to use the following pattern instead of the 3-argument version of intersperse (See Source.concat for semantics details):Source.single(">> ").concat(flow.intersperse(",")) flow.intersperse(",").concat(Source.single("END"))
Emits when upstream emits (or before with the
start
element if provided)Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def keepAlive(maxIdle: Duration, injectedElem: Creator[Out]): SubSource[Out, Mat]
Injects additional elements if upstream does not emit for a configured amount of time.
Injects additional elements if upstream does not emit for a configured amount of time. In other words, this operator attempts to maintains a base rate of emitted elements towards the downstream.
If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements do not accumulate during this period.
Upstream elements are always preferred over injected elements.
Emits when upstream emits an element or if the upstream was idle for the configured period
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @nowarn()
- def limit(n: Int): SubSource[Out, Mat]
Ensure stream boundedness by limiting the number of elements from upstream.
Ensure stream boundedness by limiting the number of elements from upstream. If the number of incoming elements exceeds max, it will signal upstream failure
StreamLimitException
downstream.Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
The stream will be completed without producing any elements if
n
is zero or negative.Emits when the specified number of elements to take has not yet been reached
Backpressures when downstream backpressures
Completes when the defined number of elements has been taken or upstream completes
Cancels when the defined number of elements has been taken or downstream cancels
See also Flow.take, Flow.takeWithin, Flow.takeWhile
- def limitWeighted(n: Long)(costFn: Function[Out, Long]): SubSource[Out, Mat]
Ensure stream boundedness by evaluating the cost of incoming elements using a cost function.
Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. Exactly how many elements will be allowed to travel downstream depends on the evaluated cost of each element. If the accumulated cost exceeds max, it will signal upstream failure
StreamLimitException
downstream.Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
The stream will be completed without producing any elements if
n
is zero or negative.Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the specified number of elements to take has not yet been reached
Backpressures when downstream backpressures
Completes when the defined number of elements has been taken or upstream completes
Cancels when the defined number of elements has been taken or downstream cancels
See also Flow.take, Flow.takeWithin, Flow.takeWhile
- def log(name: String): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow.
Uses an internally created LoggingAdapter which uses
org.apache.pekko.stream.Log
as it's source (use this class to configure slf4j loggers).Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def log(name: String, log: LoggingAdapter): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:
Uses the given LoggingAdapter for logging.
Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def log(name: String, extract: Function[Out, Any]): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:
The
extract
function will be applied to each element before logging, so it is possible to log only those fields of a complex object flowing through this element.Uses an internally created LoggingAdapter which uses
org.apache.pekko.stream.Log
as it's source (use this class to configure slf4j loggers).Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def log(name: String, extract: Function[Out, Any], log: LoggingAdapter): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:
The
extract
function will be applied to each element before logging, so it is possible to log only those fields of a complex object flowing through this element.Uses the given LoggingAdapter for logging.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def logWithMarker(name: String, marker: Function[Out, LogMarker]): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow.
Uses an internally created MarkerLoggingAdapter which uses
org.apache.pekko.stream.Log
as it's source (use this class to configure slf4j loggers).Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def logWithMarker(name: String, marker: Function[Out, LogMarker], log: MarkerLoggingAdapter): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:
Uses the given MarkerLoggingAdapter for logging.
Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def logWithMarker(name: String, marker: Function[Out, LogMarker], extract: Function[Out, Any]): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:
The
extract
function will be applied to each element before logging, so it is possible to log only those fields of a complex object flowing through this element.Uses an internally created MarkerLoggingAdapter which uses
org.apache.pekko.stream.Log
as it's source (use this class to configure slf4j loggers).Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def logWithMarker(name: String, marker: Function[Out, LogMarker], extract: Function[Out, Any], log: MarkerLoggingAdapter): SubSource[Out, Mat]
Logs elements flowing through the stream as well as completion and erroring.
Logs elements flowing through the stream as well as completion and erroring.
By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:
The
extract
function will be applied to each element before logging, so it is possible to log only those fields of a complex object flowing through this element.Uses the given MarkerLoggingAdapter for logging.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def map[T](f: Function[Out, T]): SubSource[T, Mat]
Transform this stream by applying the given function to each of the elements as they pass through this processing step.
Transform this stream by applying the given function to each of the elements as they pass through this processing step.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def mapAsync[T](parallelism: Int, f: Function[Out, CompletionStage[T]]): SubSource[T, Mat]
Transform this stream by applying the given function to each of the elements as they pass through this processing step.
Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a
CompletionStage
and the value of that future will be emitted downstream. The number of CompletionStages that shall run in parallel is given as the first argument to
. These CompletionStages may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.mapAsync
If the function
f
throws an exception or if theCompletionStage
is completed with failure and the supervision decision is pekko.stream.Supervision#stop the stream will be completed with failure.If the function
f
throws an exception or if theCompletionStage
is completed with failure and the supervision decision is pekko.stream.Supervision#resume or pekko.stream.Supervision#restart the element is dropped and the stream continues.The function
f
is always invoked on the elements in the order they arrive.Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the CompletionStage returned by the provided function finishes for the next element in sequence
Backpressures when the number of CompletionStages reaches the configured parallelism and the downstream backpressures or the first CompletionStage is not completed
Completes when upstream completes and all CompletionStages has been completed and all elements has been emitted
Cancels when downstream cancels
- See also
- def mapAsyncUnordered[T](parallelism: Int, f: Function[Out, CompletionStage[T]]): SubSource[T, Mat]
Transform this stream by applying the given function to each of the elements as they pass through this processing step.
Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a
CompletionStage
and the value of that future will be emitted downstream. The number of CompletionStages that shall run in parallel is given as the first argument to
. Each processed element will be emitted downstream as soon as it is ready, i.e. it is possible that the elements are not emitted downstream in the same order as received from upstream.mapAsyncUnordered
If the function
f
throws an exception or if theCompletionStage
is completed with failure and the supervision decision is pekko.stream.Supervision#stop the stream will be completed with failure.If the function
f
throws an exception or if theCompletionStage
is completed with failure and the supervision decision is pekko.stream.Supervision#resume or pekko.stream.Supervision#restart the element is dropped and the stream continues.The function
f
is always invoked on the elements in the order they arrive (even though the result of the futures returned byf
might be emitted in a different order).Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when any of the CompletionStage returned by the provided function complete
Backpressures when the number of CompletionStage reaches the configured parallelism and the downstream backpressures
Completes when upstream completes and all CompletionStage has been completed and all elements has been emitted
Cancels when downstream cancels
- See also
- def mapConcat[T](f: Function[Out, Iterable[T]]): SubSource[T, Mat]
Transform each input element into an
Iterable
of output elements that is then flattened into the output stream.Transform each input element into an
Iterable
of output elements that is then flattened into the output stream.Make sure that the
Iterable
is immutable or at least not modified after being used as an output sequence. Otherwise the stream may fail withConcurrentModificationException
or other more subtle errors may occur.The returned
Iterable
MUST NOT containnull
values, as they are illegal as stream elements - according to the Reactive Streams specification.Emits when the mapping function returns an element or there are still remaining elements from the previously calculated collection
Backpressures when downstream backpressures or there are still remaining elements from the previously calculated collection
Completes when upstream completes and all remaining elements has been emitted
Cancels when downstream cancels
- def mapError[E <: Throwable](clazz: Class[E], f: Function[E, Throwable]): SubSource[Out, Mat]
While similar to recover this operator can be used to transform an error signal to a different one *without* logging it as an error in the process.
While similar to recover this operator can be used to transform an error signal to a different one *without* logging it as an error in the process. So in that sense it is NOT exactly equivalent to
recover(t => throw t2)
since recover would log thet2
error.Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This operator can recover the failure signal, but not the skipped elements, which will be dropped.
Similarly to recover throwing an exception inside
mapError
_will_ be logged.Emits when element is available from the upstream or upstream is failed and pf returns an element
Backpressures when downstream backpressures
Completes when upstream completes or upstream failed with exception pf can handle
Cancels when downstream cancels
- def mapError(pf: PartialFunction[Throwable, Throwable]): SubSource[Out, Mat]
While similar to recover this operator can be used to transform an error signal to a different one *without* logging it as an error in the process.
While similar to recover this operator can be used to transform an error signal to a different one *without* logging it as an error in the process. So in that sense it is NOT exactly equivalent to
recover(t => throw t2)
since recover would log thet2
error.Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This operator can recover the failure signal, but not the skipped elements, which will be dropped.
Similarly to recover throwing an exception inside
mapError
_will_ be logged.Emits when element is available from the upstream or upstream is failed and pf returns an element
Backpressures when downstream backpressures
Completes when upstream completes or upstream failed with exception pf can handle
Cancels when downstream cancels
- def merge(that: Graph[SourceShape[Out], _]): SubSource[Out, Mat]
Merge the given Source to this Flow, taking elements as they arrive from input streams, picking randomly when several elements ready.
- def mergeAll(those: List[_ <: Graph[SourceShape[Out], _]], eagerComplete: Boolean): SubSource[Out, Mat]
Merge the given Sources to the current one, taking elements as they arrive from input streams, picking randomly when several elements ready.
Merge the given Sources to the current one, taking elements as they arrive from input streams, picking randomly when several elements ready.
Emits when one of the inputs has an element available
Backpressures when downstream backpressures
Completes when all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is
false
Cancels when downstream cancels
- def mergeLatest[M](that: Graph[SourceShape[Out], M], eagerComplete: Boolean): SubSource[List[Out], Mat]
MergeLatest joins elements from N input streams into stream of lists of size N.
MergeLatest joins elements from N input streams into stream of lists of size N. i-th element in list is the latest emitted element from i-th input stream. MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element.
Emits when an element is available from some input and each input emits at least one element from stream start
Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
- def mergePreferred[M](that: Graph[SourceShape[Out], M], preferred: Boolean, eagerComplete: Boolean): SubSource[Out, Mat]
Merge two sources.
Merge two sources. Prefer one source if both sources have elements ready.
emits when one of the inputs has an element available. If multiple have elements available, prefer the 'right' one when 'preferred' is 'true', or the 'left' one when 'preferred' is 'false'.
backpressures when downstream backpressures
completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting
eagerComplete=true
.) - def mergePrioritized[M](that: Graph[SourceShape[Out], M], leftPriority: Int, rightPriority: Int, eagerComplete: Boolean): SubSource[Out, Mat]
Merge two sources.
Merge two sources. Prefer the sources depending on the 'priority' parameters.
emits when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
backpressures when downstream backpressures
completes when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting
eagerComplete=true
.) - def mergeSorted[M](that: Graph[SourceShape[Out], M], comp: Comparator[Out]): SubSource[Out, Mat]
Merge the given Source to this Source, taking elements as they arrive from input streams, picking always the smallest of the available elements (waiting for one element from each side to be available).
Merge the given Source to this Source, taking elements as they arrive from input streams, picking always the smallest of the available elements (waiting for one element from each side to be available). This means that possible contiguity of the input streams is not exploited to avoid waiting for elements, this merge will block when one of the inputs does not have more elements (and does not complete).
Emits when all of the inputs have an element available
Backpressures when downstream backpressures
Completes when all upstreams complete
Cancels when downstream cancels
- def mergeSubstreams(): Source[Out, Mat]
Flatten the sub-flows back into the super-source by performing a merge without parallelism limit (i.e.
Flatten the sub-flows back into the super-source by performing a merge without parallelism limit (i.e. having an unbounded number of sub-flows active concurrently).
This is identical in effect to
mergeSubstreamsWithParallelism(Integer.MAX_VALUE)
. - def mergeSubstreamsWithParallelism(parallelism: Int): Source[Out, Mat]
Flatten the sub-flows back into the super-source by performing a merge with the given parallelism limit.
Flatten the sub-flows back into the super-source by performing a merge with the given parallelism limit. This means that only up to
parallelism
substreams will be executed at any given time. Substreams that are not yet executed are also not materialized, meaning that back-pressure will be exerted at the operator that creates the substreams when the parallelism limit is reached. - def named(name: String): SubSource[Out, Mat]
Add a
attribute to this Flow.name
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def orElse[M](secondary: Graph[SourceShape[Out], M]): SubSource[Out, Mat]
Provides a secondary source that will be consumed if this source completes without any elements passing by.
Provides a secondary source that will be consumed if this source completes without any elements passing by. As soon as the first element comes through this stream, the alternative will be cancelled.
Note that this Flow will be materialized together with the Source and just kept from producing elements by asserting back-pressure until its time comes or it gets cancelled.
On errors the operator is failed regardless of source of the error.
Emits when element is available from first stream or first stream closed without emitting any elements and an element is available from the second stream
Backpressures when downstream backpressures
Completes when the primary stream completes after emitting at least one element, when the primary stream completes without emitting and the secondary stream already has completed or when the secondary stream completes
Cancels when downstream cancels and additionally the alternative is cancelled as soon as an element passes by from this stream.
- def prefixAndTail(n: Int): SubSource[Pair[List[Out], Source[Out, NotUsed]], Mat]
Takes up to
n
elements from the stream (less thann
only if the upstream completes before emittingn
elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.Takes up to
n
elements from the stream (less thann
only if the upstream completes before emittingn
elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. If n is zero or negative, then this will return a pair of an empty collection and a stream containing the whole upstream unchanged.In case of an upstream error, depending on the current state
- the master stream signals the error if less than
n
elements has been seen, and therefore the substream has not yet been emitted - the tail substream signals the error after the prefix and tail has been emitted by the main stream (at that point the main stream has already completed)
Emits when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream
Backpressures when downstream backpressures or substream backpressures
Completes when prefix elements has been consumed and substream has been consumed
Cancels when downstream cancels or substream cancels
- the master stream signals the error if less than
- def prepend[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat]
Prepend the given Source to this Flow, meaning that before elements are generated from this Flow, the Source's elements will be produced until it is exhausted, at which point Flow elements will start being produced.
Prepend the given Source to this Flow, meaning that before elements are generated from this Flow, the Source's elements will be produced until it is exhausted, at which point Flow elements will start being produced.
Note that the Source is materialized together with this Flow and is "detached" meaning in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start (so it can not be combined with
Source.lazy
to defer materialization ofthat
).This flow will then be kept from producing elements by asserting back-pressure until its time comes.
When needing a prepend operator that is not detached use #prependLazy
Emits when element is available from the given Source or from current stream when the Source is completed
Backpressures when downstream backpressures
Completes when this Flow completes
Cancels when downstream cancels
- def prependLazy[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat]
Prepend the given Source to this Flow, meaning that before elements are generated from this Flow, the Source's elements will be produced until it is exhausted, at which point Flow elements will start being produced.
Prepend the given Source to this Flow, meaning that before elements are generated from this Flow, the Source's elements will be produced until it is exhausted, at which point Flow elements will start being produced.
Note that the Source is materialized together with this Flow and will then be kept from producing elements by asserting back-pressure until its time comes.
When needing a prepend operator that is also detached use #prepend
If the given Source gets upstream error - no elements from this Flow will be pulled.
Emits when element is available from the given Source or from current stream when the Source is completed
Backpressures when downstream backpressures
Completes when this Flow completes
Cancels when downstream cancels
- def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat]
Recover allows to send last element on failure and gracefully complete the stream Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
Recover allows to send last element on failure and gracefully complete the stream Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This operator can recover the failure signal, but not the skipped elements, which will be dropped.
Emits when element is available from the upstream or upstream is failed and pf returns an element
Backpressures when downstream backpressures
Completes when upstream completes or upstream failed with exception pf can handle
Cancels when downstream cancels
- def recoverWith(pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat]
RecoverWith allows to switch to alternative Source on flow failure.
RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after a failure has been recovered so that each time there is a failure it is fed into the
pf
and a new Source may be materialized.Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This operator can recover the failure signal, but not the skipped elements, which will be dropped.
Emits when element is available from the upstream or upstream is failed and element is available from alternative Source
Backpressures when downstream backpressures
Completes when upstream completes or upstream failed with exception pf can handle
Cancels when downstream cancels
- def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat]
RecoverWithRetries allows to switch to alternative Source on flow failure.
RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after a failure has been recovered up to
attempts
number of times so that each time there is a failure it is fed into thepf
and a new Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all.A negative
attempts
number is interpreted as "infinite", which results in the exact same behavior asrecoverWith
.Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This operator can recover the failure signal, but not the skipped elements, which will be dropped.
Emits when element is available from the upstream or upstream is failed and element is available from alternative Source
Backpressures when downstream backpressures
Completes when upstream completes or upstream failed with exception pf can handle
Cancels when downstream cancels
- def reduce(f: Function2[Out, Out, Out]): SubSource[Out, Mat]
Similar to
fold
but uses first element as zero element.Similar to
fold
but uses first element as zero element. Applies the given function towards its current and next value, yielding the next current value.Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when upstream completes
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def scan[T](zero: T)(f: Function2[T, Out, T]): SubSource[T, Mat]
Similar to
fold
but is not a terminal operation, emits its current value which starts atzero
and then applies the current and next value to the given functionf
, emitting the next current value.Similar to
fold
but is not a terminal operation, emits its current value which starts atzero
and then applies the current and next value to the given functionf
, emitting the next current value.If the function
f
throws an exception and the supervision decision is pekko.stream.Supervision#restart current value starts atzero
again the stream will continue.Adheres to the ActorAttributes.SupervisionStrategy attribute.
Note that the
zero
value must be immutable.Emits when the function scanning the element returns a new element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def scanAsync[T](zero: T)(f: Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat]
Similar to
scan
but with a asynchronous function, emits its current value which starts atzero
and then applies the current and next value to the given functionf
, emitting aFuture
that resolves to the next current value.Similar to
scan
but with a asynchronous function, emits its current value which starts atzero
and then applies the current and next value to the given functionf
, emitting aFuture
that resolves to the next current value.If the function
f
throws an exception and the supervision decision is pekko.stream.Supervision.Restart current value starts atzero
again the stream will continue.If the function
f
throws an exception and the supervision decision is pekko.stream.Supervision.Resume current value starts at the previous current value, or zero when it doesn't have one, and the stream will continue.Adheres to the ActorAttributes.SupervisionStrategy attribute.
Note that the
zero
value must be immutable.Emits when the future returned by f
completes
Backpressures when downstream backpressures
Completes when upstream completes and the last future returned by
f
completesCancels when downstream cancels
See also #scan
- def sliding(n: Int, step: Int = 1): SubSource[List[Out], Mat]
- def statefulMap[S, T](create: Creator[S], f: Function2[S, Out, Pair[S, T]], onComplete: Function[S, Optional[T]]): SubSource[T, Mat]
Transform each stream element with the help of a state.
Transform each stream element with the help of a state.
The state creation function is invoked once when the stream is materialized and the returned state is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, be a new immutable state but it is also safe to use a mutable state. The returned
T
MUST NOT benull
as it is illegal as stream element - according to the Reactive Streams specification.For stateless variant see map.
The
onComplete
function is called only once when the upstream or downstream finished, You can do some clean-up here, and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the mapping function returns an element and downstream is ready to consume it
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- S
the type of the state
- T
the type of the output elements
- create
a function that creates the initial state
- f
a function that transforms the upstream element and the state into a pair of next state and output element
- onComplete
a function that transforms the ongoing state into an optional output element
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def take(n: Long): SubSource[Out, Mat]
Terminate processing (and cancel the upstream publisher) after the given number of elements.
Terminate processing (and cancel the upstream publisher) after the given number of elements. Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
The stream will be completed without producing any elements if
n
is zero or negative.Emits when the specified number of elements to take has not yet been reached
Backpressures when downstream backpressures
Completes when the defined number of elements has been taken or upstream completes
Cancels when the defined number of elements has been taken or downstream cancels
- def takeWhile(p: Predicate[Out], inclusive: Boolean): SubSource[Out, Mat]
Terminate processing (and cancel the upstream publisher) after predicate returns false for the first time, including the first failed element iff inclusive is true Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
Terminate processing (and cancel the upstream publisher) after predicate returns false for the first time, including the first failed element iff inclusive is true Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
The stream will be completed without producing any elements if predicate is false for the first stream element.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the predicate is true
Backpressures when downstream backpressures
Completes when predicate returned false (or 1 after predicate returns false if
inclusive
or upstream completesCancels when predicate returned false or downstream cancels
- def takeWhile(p: Predicate[Out]): SubSource[Out, Mat]
Terminate processing (and cancel the upstream publisher) after predicate returns false for the first time, Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
Terminate processing (and cancel the upstream publisher) after predicate returns false for the first time, Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
The stream will be completed without producing any elements if predicate is false for the first stream element.
Emits when the predicate is true
Backpressures when downstream backpressures
Completes when predicate returned false (or 1 after predicate returns false if
inclusive
or upstream completesCancels when predicate returned false or downstream cancels
- def takeWithin(duration: Duration): SubSource[Out, Mat]
Terminate processing (and cancel the upstream publisher) after the given duration.
Terminate processing (and cancel the upstream publisher) after the given duration. Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
Note that this can be combined with #take to limit the number of elements within the duration.
Emits when an upstream element arrives
Backpressures when downstream backpressures
Completes when upstream completes or timer fires
Cancels when downstream cancels or timer fires
- Annotations
- @nowarn()
- def throttle(cost: Int, per: Duration, maximumBurst: Int, costCalculation: Function[Out, Integer], mode: ThrottleMode): SubSource[Out, Mat]
Sends elements downstream with speed limited to
cost/per
.Sends elements downstream with speed limited to
cost/per
. Cost is calculating for each element individually by callingcalculateCost
function. This operator works for streams when elements have different cost(length). Streams ofByteString
for example.Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). Tokens drops into the bucket at a given rate and can be
spared
for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as element costs. If there isn't any, throttle waits until the bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.Parameter
mode
manages behavior when upstream is faster than throttle rate:- pekko.stream.ThrottleMode.Shaping makes pauses before emitting messages to meet throttle rate
- pekko.stream.ThrottleMode.Enforcing fails with exception when upstream is faster than throttle rate. Enforcing cannot emit elements that cost more than the maximumBurst
It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in case burst is 0 and speed is higher than 30 events per second. You need to increase the
maximumBurst
if elements arrive with small interval (30 milliseconds or less). Use the overloadedthrottle
method withoutmaximumBurst
parameter to automatically calculate themaximumBurst
based on the given rate (cost/per
). In other words the throttler always enforces the rate limit whenmaximumBurst
parameter is given, but in certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.Emits when upstream emits an element and configured time per each element elapsed
Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
Completes when upstream completes
Cancels when downstream cancels
- def throttle(cost: Int, per: Duration, costCalculation: Function[Out, Integer]): SubSource[Out, Mat]
Sends elements downstream with speed limited to
cost/per
.Sends elements downstream with speed limited to
cost/per
. Cost is calculating for each element individually by callingcalculateCost
function. This operator works for streams when elements have different cost(length). Streams ofByteString
for example.Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). Tokens drops into the bucket at a given rate and can be
spared
for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as element costs. If there isn't any, throttle waits until the bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.The burst size is calculated based on the given rate (
cost/per
) as 0.1 * rate, for example: - rate < 20/second => burst size 1 - rate 20/second => burst size 2 - rate 100/second => burst size 10 - rate 200/second => burst size 20The throttle
mode
is pekko.stream.ThrottleMode.Shaping, which makes pauses before emitting messages to meet throttle rate.Emits when upstream emits an element and configured time per each element elapsed
Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
Completes when upstream completes
Cancels when downstream cancels
- def throttle(elements: Int, per: Duration, maximumBurst: Int, mode: ThrottleMode): SubSource[Out, Mat]
Sends elements downstream with speed limited to
elements/per
.Sends elements downstream with speed limited to
elements/per
. In other words, this operator set the maximum rate for emitting messages. This operator works for streams where all elements have the same cost or length.Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). Tokens drops into the bucket at a given rate and can be
spared
for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as element costs If there isn't any, throttle waits until the bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.Parameter
mode
manages behavior when upstream is faster than throttle rate:- pekko.stream.ThrottleMode.Shaping makes pauses before emitting messages to meet throttle rate
- pekko.stream.ThrottleMode.Enforcing fails with exception when upstream is faster than throttle rate
It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in case burst is 0 and speed is higher than 30 events per second. You need to increase the
maximumBurst
if elements arrive with small interval (30 milliseconds or less). Use the overloadedthrottle
method withoutmaximumBurst
parameter to automatically calculate themaximumBurst
based on the given rate (cost/per
). In other words the throttler always enforces the rate limit whenmaximumBurst
parameter is given, but in certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.Emits when upstream emits an element and configured time per each element elapsed
Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
Completes when upstream completes
Cancels when downstream cancels
- def throttle(elements: Int, per: Duration): SubSource[Out, Mat]
Sends elements downstream with speed limited to
elements/per
.Sends elements downstream with speed limited to
elements/per
. In other words, this operator set the maximum rate for emitting messages. This operator works for streams where all elements have the same cost or length.Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). Tokens drops into the bucket at a given rate and can be
spared
for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as element costs. If there isn't any, throttle waits until the bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.The burst size is calculated based on the given rate (
cost/per
) as 0.1 * rate, for example: - rate < 20/second => burst size 1 - rate 20/second => burst size 2 - rate 100/second => burst size 10 - rate 200/second => burst size 20The throttle
mode
is pekko.stream.ThrottleMode.Shaping, which makes pauses before emitting messages to meet throttle rate.Emits when upstream emits an element and configured time per each element elapsed
Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
Completes when upstream completes
Cancels when downstream cancels
- def to(sink: Graph[SinkShape[Out], _]): RunnableGraph[Mat]
Connect this SubSource to a Sink, concatenating the processing steps of both.
Connect this SubSource to a Sink, concatenating the processing steps of both. This means that all sub-flows that result from the previous sub-stream operator will be attached to the given sink.
+----------------------------+ | Resulting RunnableGraph | | | | +------+ +------+ | | | | | | | | | this | ~Out~> | sink | | | | | | | | | +------+ +------+ | +----------------------------+
- def toString(): String
- Definition Classes
- AnyRef → Any
- def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat]
Transform this SubSource by appending the given processing steps.
Transform this SubSource by appending the given processing steps.
+----------------------------+ | Resulting Source | | | | +------+ +------+ | | | | | | | | | this | ~Out~> | flow | ~~> T | | | | | | | +------+ +------+ | +----------------------------+
The materialized value of the combined Flow will be the materialized value of the current flow (ignoring the other Flow’s value), use viaMat if a different strategy is needed.
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- def wireTap(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat]
Attaches the given Sink to this Flow as a wire tap, meaning that elements that pass through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
Attaches the given Sink to this Flow as a wire tap, meaning that elements that pass through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
It is similar to #alsoTo which does backpressure instead of dropping elements.
Emits when element is available and demand exists from the downstream; the element will also be sent to the wire-tap Sink if there is demand.
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def wireTap(f: Procedure[Out]): SubSource[Out, Mat]
This is a simplified version of
wireTap(Sink)
that takes only a simple procedure.This is a simplified version of
wireTap(Sink)
that takes only a simple procedure. Elements will be passed into this "side channel" function, and any of its results will be ignored.If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead. It is similar to #alsoTo which does backpressure instead of dropping elements.
This operation is useful for inspecting the passed through element, usually by means of side-effecting operations (such as
println
, or emitting metrics), for each element without having to modify it.For logging signals (elements, completion, error) consider using the log operator instead, along with appropriate
ActorAttributes.logLevels
.Emits when upstream emits an element; the same element will be passed to the attached function, as well as to the downstream operator
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- def withAttributes(attr: Attributes): SubSource[Out, Mat]
Change the attributes of this Source to the given ones and seal the list of attributes.
Change the attributes of this Source to the given ones and seal the list of attributes. This means that further calls will not be able to remove these attributes, but instead add new ones. Note that this operation has no effect on an empty Flow (because the attributes apply only to the contained processing operators).
- def zip[T](source: Graph[SourceShape[T], _]): SubSource[Pair[Out, T], Mat]
Combine the elements of current Flow and the given Source into a stream of tuples.
- def zipAll[U, A >: Out](that: Graph[SourceShape[U], _], thisElem: A, thatElem: U): SubSource[Pair[A, U], Mat]
Combine the elements of current flow and the given Source into a stream of tuples.
Combine the elements of current flow and the given Source into a stream of tuples.
Emits when at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
Backpressures when downstream backpressures
Completes when all upstream completes
Cancels when downstream cancels
- def zipLatest[T](source: Graph[SourceShape[T], _]): SubSource[Pair[Out, T], Mat]
Combine the elements of current Flow and the given Source into a stream of tuples, picking always the latest element of each.
Combine the elements of current Flow and the given Source into a stream of tuples, picking always the latest element of each.
Emits when all of the inputs have at least an element available, and then each time an element becomes available on either of the inputs
Backpressures when downstream backpressures
Completes when any upstream completes
Cancels when downstream cancels
- def zipLatestWith[Out2, Out3](that: Graph[SourceShape[Out2], _], combine: Function2[Out, Out2, Out3]): SubSource[Out3, Mat]
Put together the elements of current Flow and the given Source into a stream of combined elements using a combiner function, picking always the latest element of each.
Put together the elements of current Flow and the given Source into a stream of combined elements using a combiner function, picking always the latest element of each.
Emits when all of the inputs have at least an element available, and then each time an element becomes available on either of the inputs
Backpressures when downstream backpressures
Completes when any upstream completes
Cancels when downstream cancels
- def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _], combine: Function2[Out, Out2, Out3]): SubSource[Out3, Mat]
Put together the elements of current Flow and the given Source into a stream of combined elements using a combiner function.
- def zipWithIndex: SubSource[Pair[Out, Long], Mat]
Combine the elements of current Source into a stream of tuples consisting of all elements paired with their index.
Combine the elements of current Source into a stream of tuples consisting of all elements paired with their index. Indices start at 0.
Emits when upstream emits an element and is paired with their index
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
Deprecated Value Members
- def backpressureTimeout(timeout: FiniteDuration): SubSource[Out, Mat]
If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException. The timeout is checked periodically, so the resolution of the check is one period (equals to timeout value).
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses between element emission and downstream demand.
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def completionTimeout(timeout: FiniteDuration): SubSource[Out, Mat]
If the completion of the stream does not happen until the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the completion of the stream does not happen until the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses before upstream completes
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): SubSource[Out, Mat]
Shifts elements emission in time by a specified amount.
Shifts elements emission in time by a specified amount. It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined pekko.stream.DelayOverflowStrategy it might drop elements or backpressure the upstream if there is no space available in the buffer.
Delay precision is 10ms to avoid unnecessary timer scheduling cycles
Internal buffer has default capacity 16. You can set buffer size by calling
withAttributes(inputBuffer)
Emits when there is a pending element in the buffer and configured time for this element elapsed * EmitEarly - strategy do not wait to emit element if buffer is full
Backpressures when depending on OverflowStrategy * Backpressure - backpressures when buffer is full * DropHead, DropTail, DropBuffer - never backpressures * Fail - fails the stream if buffer gets full
Completes when upstream completes and buffered elements has been drained
Cancels when downstream cancels
- of
time to shift all messages
- strategy
Strategy that is used when incoming elements cannot fit inside the buffer
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def dropWithin(duration: FiniteDuration): SubSource[Out, Mat]
Discard the elements received within the given duration at beginning of the stream.
Discard the elements received within the given duration at beginning of the stream.
Emits when the specified time elapsed and a new upstream element arrives
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable]) @Deprecated @Deprecated
- Deprecated
- def formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from SubSource[Out, Mat] toStringFormat[SubSource[Out, Mat]] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @deprecated @inline()
- Deprecated
(Since version 2.12.16) Use
formatString.format(value)
instead ofvalue.formatted(formatString)
, or use thef""
string interpolator. In Java 15 and later,formatted
resolves to the new method in String which has reversed parameters.
- def groupedWeightedWithin(maxWeight: Long, costFn: Function[Out, Long], duration: FiniteDuration): SubSource[List[Out], Mat]
Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first.
Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.
Emits when the configured time elapses since the last group has been emitted or weight limit reached
Backpressures when downstream backpressures, and buffered group (+ pending element) weighs more than
maxWeight
Completes when upstream completes (emits last group)
Cancels when downstream completes
maxWeight
must be positive, andduration
must be greater than 0 seconds, otherwise IllegalArgumentException is thrown.- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def groupedWithin(maxNumber: Int, duration: FiniteDuration): SubSource[List[Out], Mat]
Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first.
Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.
Emits when the configured time elapses since the last group has been emitted or
n
elements is bufferedBackpressures when downstream backpressures, and there are
n+1
buffered elementsCompletes when upstream completes (emits last group)
Cancels when downstream completes
maxNumber
must be positive, andduration
must be greater than 0 seconds, otherwise IllegalArgumentException is thrown.- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def idleTimeout(timeout: FiniteDuration): SubSource[Out, Mat]
If the time between two processed elements exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the time between two processed elements exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException. The timeout is checked periodically, so the resolution of the check is one period (equals to timeout value).
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses between two emitted elements
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def initialDelay(delay: FiniteDuration): SubSource[Out, Mat]
Delays the initial element by the specified duration.
Delays the initial element by the specified duration.
Emits when upstream emits an element if the initial delay is already elapsed
Backpressures when downstream backpressures or initial delay is not yet elapsed
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def initialTimeout(timeout: FiniteDuration): SubSource[Out, Mat]
If the first element has not passed through this operator before the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
If the first element has not passed through this operator before the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.
Emits when upstream emits an element
Backpressures when downstream backpressures
Completes when upstream completes or fails if timeout elapses before first element arrives
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def keepAlive(maxIdle: FiniteDuration, injectedElem: Creator[Out]): SubSource[Out, Mat]
Injects additional elements if upstream does not emit for a configured amount of time.
Injects additional elements if upstream does not emit for a configured amount of time. In other words, this operator attempts to maintains a base rate of emitted elements towards the downstream.
If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements do not accumulate during this period.
Upstream elements are always preferred over injected elements.
Emits when upstream emits an element or if the upstream was idle for the configured period
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def statefulMapConcat[T](f: Creator[Function[Out, Iterable[T]]]): SubSource[T, Mat]
Transform each input element into an
Iterable
of output elements that is then flattened into the output stream.Transform each input element into an
Iterable
of output elements that is then flattened into the output stream. The transformation is meant to be stateful, which is enabled by creating the transformation function anew for every materialization — the returned function will typically close over mutable objects to store state between invocations. For the stateless variant see #mapConcat.Make sure that the
Iterable
is immutable or at least not modified after being used as an output sequence. Otherwise the stream may fail withConcurrentModificationException
or other more subtle errors may occur.The returned
Iterable
MUST NOT containnull
values, as they are illegal as stream elements - according to the Reactive Streams specification.This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost. Use FlowOps.statefulMap instead.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the mapping function returns an element or there are still remaining elements from the previously calculated collection
Backpressures when downstream backpressures or there are still remaining elements from the previously calculated collection
Completes when upstream completes and all remaining elements has been emitted
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version 1.0.2) Use
statefulMap
withmapConcat
instead.
- def takeWithin(duration: FiniteDuration): SubSource[Out, Mat]
Terminate processing (and cancel the upstream publisher) after the given duration.
Terminate processing (and cancel the upstream publisher) after the given duration. Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
Note that this can be combined with #take to limit the number of elements within the duration.
Emits when an upstream element arrives
Backpressures when downstream backpressures
Completes when upstream completes or timer fires
Cancels when downstream cancels or timer fires
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: Function[Out, Integer], mode: ThrottleMode): SubSource[Out, Mat]
Sends elements downstream with speed limited to
cost/per
.Sends elements downstream with speed limited to
cost/per
. Cost is calculating for each element individually by callingcalculateCost
function. This operator works for streams when elements have different cost(length). Streams ofByteString
for example.Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). Tokens drops into the bucket at a given rate and can be
spared
for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as element costs. If there isn't any, throttle waits until the bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.Parameter
mode
manages behavior when upstream is faster than throttle rate:- pekko.stream.ThrottleMode.Shaping makes pauses before emitting messages to meet throttle rate
- pekko.stream.ThrottleMode.Enforcing fails with exception when upstream is faster than throttle rate. Enforcing cannot emit elements that cost more than the maximumBurst
It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in case burst is 0 and speed is higher than 30 events per second. You need to increase the
maximumBurst
if elements arrive with small interval (30 milliseconds or less). Use the overloadedthrottle
method withoutmaximumBurst
parameter to automatically calculate themaximumBurst
based on the given rate (cost/per
). In other words the throttler always enforces the rate limit whenmaximumBurst
parameter is given, but in certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.Emits when upstream emits an element and configured time per each element elapsed
Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): SubSource[Out, Mat]
Sends elements downstream with speed limited to
elements/per
.Sends elements downstream with speed limited to
elements/per
. In other words, this operator set the maximum rate for emitting messages. This operator works for streams where all elements have the same cost or length.Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). Tokens drops into the bucket at a given rate and can be
spared
for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as element costs If there isn't any, throttle waits until the bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.Parameter
mode
manages behavior when upstream is faster than throttle rate:- pekko.stream.ThrottleMode.Shaping makes pauses before emitting messages to meet throttle rate
- pekko.stream.ThrottleMode.Enforcing fails with exception when upstream is faster than throttle rate
It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in case burst is 0 and speed is higher than 30 events per second. You need to increase the
maximumBurst
if elements arrive with small interval (30 milliseconds or less). Use the overloadedthrottle
method withoutmaximumBurst
parameter to automatically calculate themaximumBurst
based on the given rate (cost/per
). In other words the throttler always enforces the rate limit whenmaximumBurst
parameter is given, but in certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.Emits when upstream emits an element and configured time per each element elapsed
Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
Completes when upstream completes
Cancels when downstream cancels
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use the overloaded one which accepts java.time.Duration instead.
- def throttleEven(cost: Int, per: Duration, costCalculation: (Out) => Int, mode: ThrottleMode): SubSource[Out, Mat]
This is a simplified version of throttle that spreads events evenly across the given time interval.
This is a simplified version of throttle that spreads events evenly across the given time interval.
Use this operator when you need just slow down a stream without worrying about exact amount of time between events.
If you want to be sure that no time interval has no more than specified number of events you need to use throttle with maximumBurst attribute.
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use throttle without
maximumBurst
parameter instead.- See also
- def throttleEven(cost: Int, per: FiniteDuration, costCalculation: (Out) => Int, mode: ThrottleMode): SubSource[Out, Mat]
This is a simplified version of throttle that spreads events evenly across the given time interval.
This is a simplified version of throttle that spreads events evenly across the given time interval.
Use this operator when you need just slow down a stream without worrying about exact amount of time between events.
If you want to be sure that no time interval has no more than specified number of events you need to use throttle with maximumBurst attribute.
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use throttle without
maximumBurst
parameter instead.- See also
- def throttleEven(elements: Int, per: Duration, mode: ThrottleMode): SubSource[Out, Mat]
This is a simplified version of throttle that spreads events evenly across the given time interval.
This is a simplified version of throttle that spreads events evenly across the given time interval.
Use this operator when you need just slow down a stream without worrying about exact amount of time between events.
If you want to be sure that no time interval has no more than specified number of events you need to use throttle with maximumBurst attribute.
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use throttle without
maximumBurst
parameter instead.- See also
- def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): SubSource[Out, Mat]
This is a simplified version of throttle that spreads events evenly across the given time interval.
This is a simplified version of throttle that spreads events evenly across the given time interval.
Use this operator when you need just slow down a stream without worrying about exact amount of time between events.
If you want to be sure that no time interval has no more than specified number of events you need to use throttle with maximumBurst attribute.
- Annotations
- @deprecated
- Deprecated
(Since version Akka 2.5.12) Use throttle without
maximumBurst
parameter instead.- See also
- def →[B](y: B): (SubSource[Out, Mat], B)
- Implicit
- This member is added by an implicit conversion from SubSource[Out, Mat] toArrowAssoc[SubSource[Out, Mat]] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
- Annotations
- @deprecated
- Deprecated
(Since version 2.13.0) Use
->
instead. If you still wish to display it as one character, consider using a font with programming ligatures such as Fira Code.