collect
Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.
Signature
Source.collect
Source.collect
Flow.collect
Flow.collect
Description
Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream. This can often replace filter
followed by map
to achieve the same in one single operator.
collect
is supposed to be used with PFBuilder
PFBuilder
to construct the partial function. There is also a collectType that often can be easier to use than the PFBuilder
and then combine with ordinary filter
and map
operators.
Example
Given stream element classes Message
, Ping
, and Pong
, where Ping
extends Message
and Pong
is an unrelated class.
- Scala
-
source
trait Message final case class Ping(id: Int) extends Message final case class Pong(id: Int)
- Java
-
source
static interface Message {} static class Ping implements Message { final int id; Ping(int id) { this.id = id; } } static class Pong { final int id; Pong(int id) { this.id = id; } }
From a stream of Message
elements we would like to collect all elements of type Ping
that have an id != 0
, and then covert to Pong
with same id.
- Scala
-
source
val flow: Flow[Message, Pong, NotUsed] = Flow[Message].collect { case Ping(id) if id != 0 => Pong(id) }
- Java
-
source
Flow<Message, Pong, NotUsed> flow = Flow.of(Message.class) .collect( PFBuilder.<Message, Pong>create() .match(Ping.class, p -> p.id != 0, p -> new Pong(p.id)) .build());
An alternative is to use collectType
. The same conversion be written as follows, and it is as efficient.
- Java
-
source
Flow<Message, Pong, NotUsed> flow = Flow.of(Message.class) .collectType(Ping.class) .filter(p -> p.id != 0) .map(p -> new Pong(p.id));
Reactive Streams semantics
emits when the provided partial function is defined for the element
backpressures the partial function is defined for the element and downstream backpressures
completes when upstream completes