Sink.takeLast
Collect the last n
values emitted from the stream into a collection.
Signature
Description
Materializes into a Future
CompletionStage
of immutable.Seq[T]
List<In>
containing the last n
collected elements when the stream completes. If the stream completes before signaling at least n elements, the Future
CompletionStage
will complete with the number of elements taken at that point. If the stream never completes, the Future
CompletionStage
will never complete. If there is a failure signaled in the stream the Future
CompletionStage
will be completed with failure.
Example
- Scala
-
source
case class Student(name: String, gpa: Double) val students = List( Student("Alison", 4.7), Student("Adrian", 3.1), Student("Alexis", 4), Student("Benita", 2.1), Student("Kendra", 4.2), Student("Jerrie", 4.3)).sortBy(_.gpa) val sourceOfStudents = Source(students) val result: Future[Seq[Student]] = sourceOfStudents.runWith(Sink.takeLast(3)) result.foreach { topThree => println("#### Top students ####") topThree.reverse.foreach { s => println(s"Name: ${s.name}, GPA: ${s.gpa}") } } /* #### Top students #### Name: Alison, GPA: 4.7 Name: Jerrie, GPA: 4.3 Name: Kendra, GPA: 4.2 */
- Java
-
source
import org.apache.pekko.japi.Pair; import org.reactivestreams.Publisher; // pair of (Name, GPA) List<Pair<String, Double>> sortedStudents = Arrays.asList( new Pair<>("Benita", 2.1), new Pair<>("Adrian", 3.1), new Pair<>("Alexis", 4.0), new Pair<>("Kendra", 4.2), new Pair<>("Jerrie", 4.3), new Pair<>("Alison", 4.7)); Source<Pair<String, Double>, NotUsed> studentSource = Source.from(sortedStudents); CompletionStage<List<Pair<String, Double>>> topThree = studentSource.runWith(Sink.takeLast(3), system); topThree.thenAccept( result -> { System.out.println("#### Top students ####"); for (int i = result.size() - 1; i >= 0; i--) { Pair<String, Double> s = result.get(i); System.out.println("Name: " + s.first() + ", " + "GPA: " + s.second()); } }); /* #### Top students #### Name: Alison, GPA: 4.7 Name: Jerrie, GPA: 4.3 Name: Kendra, GPA: 4.2 */
Reactive Streams semantics
cancels never
backpressures never
1.0.1