Sink.takeLast
Collect the last n
values emitted from the stream into a collection.
Signature¶
Description¶
Materializes into a Future
of immutable.Seq[T]
containing the last n
collected elements when the stream completes. If the stream completes before signaling at least n elements, the Future
will complete with the number of elements taken at that point. If the stream never completes, the Future
will never complete. If there is a failure signaled in the stream the Future
will be completed with failure.
Example¶
sourcecase class Student(name: String, gpa: Double)
val students = List(
Student("Alison", 4.7),
Student("Adrian", 3.1),
Student("Alexis", 4),
Student("Benita", 2.1),
Student("Kendra", 4.2),
Student("Jerrie", 4.3)).sortBy(_.gpa)
val sourceOfStudents = Source(students)
val result: Future[Seq[Student]] = sourceOfStudents.runWith(Sink.takeLast(3))
result.foreach { topThree =>
println("#### Top students ####")
topThree.reverse.foreach { s =>
println(s"Name: ${s.name}, GPA: ${s.gpa}")
}
}
/*
#### Top students ####
Name: Alison, GPA: 4.7
Name: Jerrie, GPA: 4.3
Name: Kendra, GPA: 4.2
*/
sourceimport org.apache.pekko.japi.Pair;
import org.reactivestreams.Publisher;
// pair of (Name, GPA)
List<Pair<String, Double>> sortedStudents =
Arrays.asList(
new Pair<>("Benita", 2.1),
new Pair<>("Adrian", 3.1),
new Pair<>("Alexis", 4.0),
new Pair<>("Kendra", 4.2),
new Pair<>("Jerrie", 4.3),
new Pair<>("Alison", 4.7));
Source<Pair<String, Double>, NotUsed> studentSource = Source.from(sortedStudents);
CompletionStage<List<Pair<String, Double>>> topThree =
studentSource.runWith(Sink.takeLast(3), system);
topThree.thenAccept(
result -> {
System.out.println("#### Top students ####");
for (int i = result.size() - 1; i >= 0; i--) {
Pair<String, Double> s = result.get(i);
System.out.println("Name: " + s.first() + ", " + "GPA: " + s.second());
}
});
/*
#### Top students ####
Name: Alison, GPA: 4.7
Name: Jerrie, GPA: 4.3
Name: Kendra, GPA: 4.2
*/
Reactive Streams semantics¶
cancels never
backpressures never
1.1.3