flatMapMerge
Transform each input element into a Source whose elements are then flattened into the output stream through merging.
Nesting and flattening operators
Signature
Flow.flatMapMergeFlow.flatMapMerge
Description
Transform each input element into a Source whose elements are then flattened into the output stream through merging. The maximum number of merged sources has to be specified. When this is met flatMapMerge does not request any more elements meaning that it back pressures until one of the existing Sources completes. Order of the elements for each Source is preserved but there is no deterministic order between elements from different active Sources.
See also: flatMapConcat, mapConcat
Example
In the following example flatMapMerge is used to create a Source for each incoming customerId. This could, for example, be a calculation or a query to a database. There can be breadth active sources at any given time so events for different customers could interleave in any order but events for the same customer will be in the order emitted by the underlying Source;
- Scala
-
source
val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2")) // e.g. could b a query to a database def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = { Source(List(s"$customerId-evt-1", s"$customerId-evt2")) } source.flatMapMerge(10, customerId => lookupCustomerEvents(customerId)).runForeach(println) // prints - events from different customers could interleave // customer-1-evt-1 // customer-2-evt-1 // customer-1-evt-2 // customer-2-evt-2 - Java
-
source
// e.g. could be a query to a database private Source<String, NotUsed> lookupCustomerEvents(String customerId) { return Source.from(Arrays.asList(customerId + "-evt-1", customerId + "-evt-2")); } Source.from(Arrays.asList("customer-1", "customer-2")) .flatMapMerge(10, this::lookupCustomerEvents) .runForeach(System.out::println, system); // prints - events from different customers could interleave // customer-1-evt-1 // customer-2-evt-1 // customer-1-evt-2 // customer-2-evt-2
Reactive Streams semantics
emits when one of the currently consumed substreams has an element available
backpressures when downstream backpressures or the max number of substreams is reached
completes when upstream completes and all consumed substreams complete