Operators

Source operators

These built-in sources are available from org.apache.pekko.stream.scaladsl.Source org.apache.pekko.stream.javadsl.Source:

Operator Description
Source asSourceWithContext Extracts context data from the elements of a Source so that it can be turned into a SourceWithContext which can propagate that context per element along a stream.
Source asSubscriber Integration with Reactive Streams, materializes into a Subscriber.
Source combine Combine several sources, using a given strategy such as merge or concat, into one source.
Source completionStage Send the single value of the CompletionStage when it completes and there is demand.
Source completionStageSource Streams the elements of an asynchronous source once its given completion operator completes.
Source cycle Stream iterator in cycled manner.
Source empty Complete right away without ever emitting any elements.
Source failed Fail directly with a user specified exception.
Source applyfrom Stream the values of an immutable.SeqIterable.
Source fromCompletionStage Deprecated by Source.completionStage.
Source fromFuture Deprecated by Source.future.
Source fromFutureSource Deprecated by Source.futureSource.
Source fromIterator Stream the values from an Iterator, requesting the next value when there is demand.
Source fromJavaStream Stream the values from a Java 8 Stream, requesting the next value when there is demand.
Source fromPublisher Integration with Reactive Streams, subscribes to a Publisher.
Source fromSourceCompletionStage Deprecated by Source.completionStageSource.
Source future Send the single value of the Future when it completes and there is demand.
Source futureSource Streams the elements of the given future source once it successfully completes.
Source lazily Deprecated by Source.lazySource.
Source lazilyAsync Deprecated by Source.lazyFutureSource.
Source lazyCompletionStage Defers creation of a future of a single element source until there is demand.
Source lazyCompletionStageSource Defers creation of a future source until there is demand.
Source lazyFuture Defers creation of a future of a single element source until there is demand.
Source lazyFutureSource Defers creation and materialization of a Source until there is demand.
Source lazySingle Defers creation of a single element source until there is demand.
Source lazySource Defers creation and materialization of a Source until there is demand.
Source maybe Create a source that emits once the materialized Promise CompletableFuture is completed with a value.
Source never Never emit any elements, never complete and never fail.
Source queue Materialize a BoundedSourceQueue or SourceQueue onto which elements can be pushed for emitting from the source.
Source range Emit each integer in a range, with an option to take bigger steps than 1.
Source repeat Stream a single object repeatedly.
Source single Stream a single object once.
Source tick A periodical repetition of an arbitrary object.
Source unfold Stream the result of a function as long as it returns a Some non empty Optional.
Source unfoldAsync Just like unfold but the fold function returns a Future CompletionStage.
Source unfoldResource Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
Source unfoldResourceAsync Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.
Source zipN Combine the elements of multiple sources into a source of sequences of value.
Source zipWithN Combine the elements of multiple streams into a stream of sequences using a combiner function.

Sink operators

These built-in sinks are available from org.apache.pekko.stream.scaladsl.Sink org.apache.pekko.stream.javadsl.Sink:

Operator Description
Sink asPublisher Integration with Reactive Streams, materializes into a org.reactivestreams.Publisher.
Sink cancelled Immediately cancel the stream
Sink collect Collect all input elements using a Java Collector.
Sink collection Collect all values emitted from the stream into a collection.Operator only available in the Scala API. The closest operator in the Java API is Sink.seq.
Sink combine Combine several sinks into one using a user specified strategy
Sink completionStageSink Streams the elements to the given future sink once it successfully completes.
Sink fold Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.
Sink foreach Invoke a given procedure for each element received.
Sink foreachAsync Invoke a given procedure asynchronously for each element received.
Sink foreachParallel Like foreach but allows up to parallellism procedure calls to happen in parallel.
Sink fromMaterializer Defer the creation of a Sink until materialization and access Materializer and Attributes
Sink fromSubscriber Integration with Reactive Streams, wraps a org.reactivestreams.Subscriber as a sink.
Sink futureSink Streams the elements to the given future sink once it successfully completes.
Sink head Materializes into a Future CompletionStage which completes with the first value arriving, after this the stream is canceled.
Sink headOption Materializes into a Future[Option[T]] CompletionStage<Optional<T>> which completes with the first value arriving wrapped in Some Optional, or a None an empty Optional if the stream completes without any elements emitted.
Sink ignore Consume all elements but discards them.
Sink last Materializes into a Future CompletionStage which will complete with the last value emitted when the stream completes.
Sink lastOption Materialize a Future[Option[T]] CompletionStage<Optional<T>> which completes with the last value emitted wrapped in an Some Optional when the stream completes.
Sink lazyCompletionStageSink Defers creation and materialization of a Sink until there is a first element.
Sink lazyFutureSink Defers creation and materialization of a Sink until there is a first element.
Sink lazyInitAsync Deprecated by Sink.lazyFutureSink.
Sink lazySink Defers creation and materialization of a Sink until there is a first element.
Sink never Always backpressure never cancel and never consume any elements from the stream.
Sink onComplete Invoke a callback when the stream has completed or failed.
Sink preMaterialize Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements ‘into’ the pre-materialized one.
Sink queue Materialize a SinkQueue that can be pulled to trigger demand through the sink.
Sink reduce Apply a reduction function on the incoming elements and pass the result to the next invocation.
Sink seq Collect values emitted from the stream into a collection.
Sink setup Defer the creation of a Sink until materialization and access ActorMaterializer and Attributes
Sink takeLast Collect the last n values emitted from the stream into a collection.

Additional Sink and Source converters

Sources and sinks for integrating with java.io.InputStream and java.io.OutputStream can be found on StreamConverters. As they are blocking APIs the implementations of these operators are run on a separate dispatcher configured through the pekko.stream.blocking-io-dispatcher.

Warning

Be aware that asInputStream and asOutputStream materialize InputStream and OutputStream respectively as blocking API implementation. They will block the thread until data will be available from upstream. Because of blocking nature these objects cannot be used in mapMaterializeValue section as it causes deadlock of the stream materialization process. For example, following snippet will fall with timeout exception:

...
.toMat(StreamConverters.asInputStream().mapMaterializedValue { inputStream =>
        inputStream.read()  // this could block forever
        ...
}).run()
Operator Description
StreamConverters asInputStream Create a sink which materializes into an InputStream that can be read to trigger demand through the sink.
StreamConverters asJavaStream Create a sink which materializes into Java 8 Stream that can be run to trigger demand through the sink.
StreamConverters asOutputStream Create a source that materializes into an OutputStream.
StreamConverters fromInputStream Create a source that wraps an InputStream.
StreamConverters fromJavaStream Create a source that wraps a Java 8 java.util.stream.Stream.
StreamConverters fromOutputStream Create a sink that wraps an OutputStream.
StreamConverters javaCollector Create a sink which materializes into a Future CompletionStage which will be completed with a result of the Java 8 Collector transformation and reduction operations.
StreamConverters javaCollectorParallelUnordered Create a sink which materializes into a Future CompletionStage which will be completed with a result of the Java 8 Collector transformation and reduction operations.

File IO Sinks and Sources

Sources and sinks for reading and writing files can be found on FileIO.

Operator Description
FileIO fromFile Emits the contents of a file.
FileIO fromPath Emits the contents of a file from the given path.
FileIO toFile Create a sink which will write incoming ByteString s to a given file.
FileIO toPath Create a sink which will write incoming ByteString s to a given file path.

Simple operators

These operators can transform the rate of incoming elements since there are operators that emit multiple elements for a single input (e.g. mapConcat) or consume multiple elements before emitting one output (e.g. filter). However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the rate is affected. This is in contrast with detached operators which can change their processing behavior depending on being backpressured by downstream or not.

Operator Description
Flow asFlowWithContext Extracts context data from the elements of a Flow so that it can be turned into a FlowWithContext which can propagate that context per element along a stream.
Source/Flow collect Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.
Source/Flow collectType Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.
Flow completionStageFlow Streams the elements through the given future flow once it successfully completes.
Source/Flow detach Detach upstream demand from downstream demand without detaching the stream rates.
Source/Flow drop Drop n elements and then pass any subsequent element downstream.
Source/Flow dropWhile Drop elements as long as a predicate function return true for the element
Source/Flow filter Filter the incoming elements using a predicate.
Source/Flow filterNot Filter the incoming elements using a predicate.
Flow flattenOptional Collect the value of Optional from all the elements passing through this flow , empty Optional is filtered out.
Source/Flow fold Start with current value zero and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.
Source/Flow foldAsync Just like fold but receives a function that results in a Future CompletionStage to the next value.
Source/Flow fromMaterializer Defer the creation of a Source/Flow until materialization and access Materializer and Attributes
Flow futureFlow Streams the elements through the given future flow once it successfully completes.
Source/Flow grouped Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.
Source/Flow groupedWeighted Accumulate incoming events until the combined weight of elements is greater than or equal to the minimum weight and then pass the collection of elements downstream.
Source/Flow intersperse Intersperse stream with provided element similar to List.mkString.
Flow lazyCompletionStageFlow Defers creation and materialization of a Flow until there is a first element.
Flow lazyFlow Defers creation and materialization of a Flow until there is a first element.
Flow lazyFutureFlow Defers creation and materialization of a Flow until there is a first element.
Flow lazyInitAsync Deprecated by Flow.lazyFutureFlow in combination with prefixAndTail.
Source/Flow limit Limit number of element from upstream to given max number.
Source/Flow limitWeighted Limit the total weight of incoming elements
Source/Flow log Log elements flowing through the stream as well as completion and erroring.
Source/Flow logWithMarker Log elements flowing through the stream as well as completion and erroring.
Source/Flow map Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.
Source/Flow mapConcat Transform each element into zero or more elements that are individually passed downstream.
Source/Flow preMaterialize Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.
Source/Flow reduce Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.
Source/Flow scan Emit its current value, which starts at zero, and then apply the current and next value to the given function, emitting the next current value.
Source/Flow scanAsync Just like scan but receives a function that results in a Future CompletionStage to the next value.
Source/Flow setup Defer the creation of a Source/Flow until materialization and access Materializer and Attributes
Source/Flow sliding Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Source/Flow statefulMap Transform each stream element with the help of a state.
Source/Flow statefulMapConcat Transform each element into zero or more elements that are individually passed downstream.
Source/Flow take Pass n incoming elements downstream and then complete
Source/Flow takeWhile Pass elements downstream as long as a predicate function returns true and then complete.
Source/Flow throttle Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element.

Flow operators composed of Sinks and Sources

Operator Description
Flow fromSinkAndSource Creates a Flow from a Sink and a Source where the Flow’s input will be sent to the Sink and the Flow ’s output will come from the Source.
Flow fromSinkAndSourceCoupled Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow between them.

Asynchronous operators

These operators encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous operation at the same time (usually handling the completion of a Future CompletionStage).

Operator Description
Source/Flow mapAsync Pass incoming elements to a function that return a Future CompletionStage result.
Source/Flow mapAsyncUnordered Like mapAsync but Future CompletionStage results are passed downstream as they arrive regardless of the order of the elements that triggered them.

Timer driven operators

These operators process elements using timers, delaying, dropping or grouping elements for certain time durations.

Operator Description
Source/Flow delay Delay every element passed through with a specific duration.
Source/Flow delayWith Delay every element passed through with a duration that can be controlled dynamically.
Source/Flow dropWithin Drop elements until a timeout has fired
Source/Flow groupedWeightedWithin Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first.
Source/Flow groupedWithin Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, whatever happens first.
Source/Flow initialDelay Delays the initial element by the specified duration.
Source/Flow takeWithin Pass elements downstream within a timeout and then complete.

Backpressure aware operators

These operators are aware of the backpressure provided by their downstreams and able to adapt their behavior to that signal.

Operator Description
Source/Flow aggregateWithBoundary Aggregate and emit until custom boundary condition met.
Source/Flow batch Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum number of batched elements is not yet reached.
Source/Flow batchWeighted Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum weight batched elements is not yet reached.
Source/Flow buffer Allow for a temporarily faster upstream events by buffering size elements.
Source/Flow conflate Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.
Source/Flow conflateWithSeed Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.
Source/Flow expand Like extrapolate, but does not have the initial argument, and the Iterator is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.
Source/Flow extrapolate Allow for a faster downstream by expanding the last emitted element to an Iterator.

Nesting and flattening operators

These operators either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains nested streams and turn them into a stream of elements instead (flattening).

See the Substreams page for more detail and code samples.

Operator Description
Source/Flow flatMapConcat Transform each input element into a Source whose elements are then flattened into the output stream through concatenation.
Source/Flow flatMapMerge Transform each input element into a Source whose elements are then flattened into the output stream through merging.
Source/Flow flatMapPrefix Use the first n elements from the stream to determine how to process the rest.
Source/Flow groupBy Demultiplex the incoming stream into separate output streams.
Source/Flow prefixAndTail Take up to n elements from the stream (less than n only if the upstream completes before emitting n elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.
Source/Flow splitAfter End the current substream whenever a predicate returns true, starting a new substream for the next element.
Source/Flow splitWhen Split off elements into a new substream whenever a predicate function return true.

Time aware operators

Those operators operate taking time into consideration.

Operator Description
Source/Flow backpressureTimeout If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a TimeoutException.
Source/Flow completionTimeout If the completion of the stream does not happen until the provided timeout, the stream is failed with a TimeoutException.
Source/Flow idleTimeout If the time between two processed elements exceeds the provided timeout, the stream is failed with a TimeoutException.
Source/Flow initialTimeout If the first element has not passed through this operators before the provided timeout, the stream is failed with a TimeoutException.
Source/Flow keepAlive Injects additional (configured) elements if upstream does not emit for a configured amount of time.

Fan-in operators

These operators take multiple streams as their input and provide a single output combining the elements from all of the inputs in different ways.

Operator Description
MergeSequence Merge a linear sequence partitioned across multiple sources.
Source/Flow concat After completion of the original upstream the elements of the given source will be emitted.
Source/Flow concatAllLazy After completion of the original upstream the elements of the given sources will be emitted sequentially.
Source/Flow concatLazy After completion of the original upstream the elements of the given source will be emitted.
Source/Flow interleave Emits a specifiable number of elements from the original source, then from the provided source and repeats.
Source/Flow interleaveAll Emits a specifiable number of elements from the original source, then from the provided sources and repeats.
Source/Flow merge Merge multiple sources.
Source/Flow mergeAll Merge multiple sources.
Source/Flow mergeLatest Merge multiple sources.
Source/Flow mergePreferred Merge multiple sources.
Source/Flow mergePrioritized Merge multiple sources.
Source mergePrioritizedN Merge multiple sources with priorities.
Source/Flow mergeSorted Merge multiple sources.
Source/Flow orElse If the primary source completes without emitting any elements, the elements from the secondary source are emitted.
Source/Flow prepend Prepends the given source to the flow, consuming it until completion before the original source is consumed.
Source/Flow prependLazy Prepends the given source to the flow, consuming it until completion before the original source is consumed.
Source/Flow zip Combines elements from each of multiple sources into tuples Pair and passes the tuples pairs downstream.
Source/Flow zipAll Combines elements from two sources into tuples Pair handling early completion of either source.
Source/Flow zipLatest Combines elements from each of multiple sources into tuples Pair and passes the tuples pairs downstream, picking always the latest element of each.
Source/Flow zipLatestWith Combines elements from multiple sources through a combine function and passes the returned value downstream, picking always the latest element of each.
Source/Flow zipWith Combines elements from multiple sources through a combine function and passes the returned value downstream.
Source/Flow zipWithIndex Zips elements of current flow with its indices.

Fan-out operators

These have one input and multiple outputs. They might route the elements between different outputs, or emit elements on multiple outputs at the same time.

There is a number of fan-out operators for which currently no ‘fluent’ is API available. To use those you will have to use the Graph DSL.

Operator Description
Balance Fan-out the stream to several streams.
Broadcast Emit each incoming element each of n outputs.
Partition Fan-out the stream to several streams.
Unzip Takes a stream of two element tuples and unzips the two elements ino two different downstreams.
UnzipWith Splits each element of input into multiple downstreams using a function
Source/Flow alsoTo Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.
Source/Flow alsoToAll Attaches the given SourceSources to this FlowFlow, meaning that elements that pass through this FlowFlow will also be sent to all those SinkSinks.
Source/Flow divertTo Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.
Source/Flow wireTap Attaches the given Sink to this Flow as a wire tap, meaning that elements that pass through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.

Watching status operators

Operator Description
Source/Flow monitor Materializes to a FlowMonitor that monitors messages flowing through or completion of the operators.
Source/Flow watchTermination Materializes to a Future CompletionStage that will be completed with Done or failed depending whether the upstream of the operators has been completed or failed.

Actor interop operators

Operators meant for inter-operating between Pekko Streams and Actors:

Operator Description
Source actorRef Materialize an ActorRef of the classic actors API; sending messages to it will emit them on the stream.
Sink actorRef Send the elements from the stream to an ActorRef of the classic actors API.
ActorSource actorRef Materialize an ActorRef<T>ActorRef[T] of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.
ActorSink actorRef Sends the elements of the stream to the given ActorRef<T>ActorRef[T] of the new actors API, without considering backpressure.
Source actorRefWithBackpressure Materialize an ActorRef of the classic actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
Sink actorRefWithBackpressure Send the elements from the stream to an ActorRef (of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
ActorSource actorRefWithBackpressure Materialize an ActorRef<T>ActorRef[T] of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
ActorSink actorRefWithBackpressure Sends the elements of the stream to the given ActorRef<T>ActorRef[T] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.
Source/Flow ask Use the “Ask Pattern” to send a request-reply message to the target ref actor (of the classic actors API).
ActorFlow ask Use the “Ask Pattern” to send each stream element as an ask to the target actor (of the new actors API), and expect a reply that will be emitted downstream.
ActorFlow askWithContext Use the “Ask Pattern” to send each stream element (without the context) as an ask to the target actor (of the new actors API), and expect a reply that will be emitted downstream.
ActorFlow askWithStatus Use the “Ask Pattern” to send each stream element as an ask to the target actor (of the new actors API), and expect a reply of Type StatusReply[T]StatusReply<T> where the T will be unwrapped and emitted downstream.
ActorFlow askWithStatusAndContext Use the “Ask Pattern” to send each stream element (without the context) as an ask to the target actor (of the new actors API), and expect a reply of Type StatusReply[T]StatusReply<T> where the T will be unwrapped and emitted downstream.
PubSub sink A sink that will publish emitted messages to a TopicTopic.
PubSub source A source that will subscribe to a TopicTopic and stream messages published to the topic.
Source/Flow watch Watch a specific ActorRef and signal a failure downstream once the actor terminates.

Compression operators

Flow operators to (de)compress.

Operator Description
Compression deflate Creates a flow that deflate-compresses a stream of ByteStrings.
Compression gunzip Creates a flow that gzip-decompresses a stream of ByteStrings.
Compression gzip Creates a flow that gzip-compresses a stream of ByteStrings.
Compression inflate Creates a flow that deflate-decompresses a stream of ByteStrings.

Error handling

For more background see the Error Handling in Streams section.

Operator Description
Source/Flow mapError While similar to recover this operators can be used to transform an error signal to a different one without logging it as an error in the process.
RestartSource onFailuresWithBackoff Wrap the given SourceSource with a SourceSource that will restart it when it fails using an exponential backoff. Notice that this SourceSource will not restart on completion of the wrapped flow.
RestartFlow onFailuresWithBackoff Wrap the given FlowFlow with a FlowFlow that will restart it when it fails using an exponential backoff. Notice that this FlowFlow will not restart on completion of the wrapped flow.
Source/Flow recover Allow sending of one last element downstream when a failure has happened upstream.
Source/Flow recoverWith Allow switching to alternative Source when a failure has happened upstream.
Source/Flow recoverWithRetries RecoverWithRetries allows to switch to alternative Source on flow failure.
RestartSource withBackoff Wrap the given SourceSource with a SourceSource that will restart it when it fails or completes using an exponential backoff.
RestartFlow withBackoff Wrap the given FlowFlow with a FlowFlow that will restart it when it fails or complete using an exponential backoff.
RestartSink withBackoff Wrap the given SinkSink with a SinkSink that will restart it when it fails or complete using an exponential backoff.
RetryFlow withBackoff Wrap the given FlowFlow and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.
RetryFlow withBackoffAndContext Wrap the given FlowWithContextFlowWithContext and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.