collect
Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.
Signature¶
Description¶
Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream. This can often replace filter
followed by map
to achieve the same in one single operator.
Example¶
Given stream element classes Message
, Ping
, and Pong
, where Ping
extends Message
and Pong
is an unrelated class.
sourcetrait Message
final case class Ping(id: Int) extends Message
final case class Pong(id: Int)
sourcestatic interface Message {}
static class Ping implements Message {
final int id;
Ping(int id) {
this.id = id;
}
}
static class Pong {
final int id;
Pong(int id) {
this.id = id;
}
}
From a stream of Message
elements we would like to collect all elements of type Ping
that have an id != 0
, and then covert to Pong
with same id.
sourceval flow: Flow[Message, Pong, NotUsed] =
Flow[Message].collect {
case Ping(id) if id != 0 => Pong(id)
}
sourceFlow<Message, Pong, NotUsed> flow =
Flow.of(Message.class)
.collect(
PFBuilder.<Message, Pong>create()
.match(Ping.class, p -> p.id != 0, p -> new Pong(p.id))
.build());
Reactive Streams semantics¶
emits when the provided partial function is defined for the element
backpressures the partial function is defined for the element and downstream backpressures
completes when upstream completes