flatMapConcat
Transform each input element into a Source
whose elements are then flattened into the output stream through concatenation.
Nesting and flattening operators
Signature¶
Description¶
Transform each input element into a Source
whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.
See also: flatMapMerge, mapConcat
Example¶
In the following example flatMapConcat
is used to create a Source
for each incoming customerId. This could be, for example, a calculation or a query to a database. Each customer is then passed to lookupCustomerEvents
which returns a Source
. All the events for a customer are delivered before moving to the next customer.
sourceval source: Source[String, NotUsed] = Source(List("customer-1", "customer-2"))
// e.g. could b a query to a database
def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = {
Source(List(s"$customerId-event-1", s"$customerId-event-2"))
}
source.flatMapConcat(customerId => lookupCustomerEvents(customerId)).runForeach(println)
// prints - events from each customer consecutively
// customer-1-event-1
// customer-1-event-2
// customer-2-event-1
// customer-2-event-2
source// e.g. could be a query to a database
private Source<String, NotUsed> lookupCustomerEvents(String customerId) {
return Source.from(Arrays.asList(customerId + "-event-1", customerId + "-event-2"));
}
Source.from(Arrays.asList("customer-1", "customer-2"))
.flatMapConcat(this::lookupCustomerEvents)
.runForeach(System.out::println, system);
// prints - events from each customer consecutively
// customer-1-event-1
// customer-1-event-2
// customer-2-event-1
// customer-2-event-2
Reactive Streams semantics¶
emits when the current consumed substream has an element available
backpressures when downstream backpressures
completes when upstream completes and all consumed substreams complete