Source.futureSource
Streams the elements of the given future source once it successfully completes.
Signature¶
Description¶
Streams the elements of the given future source once it successfully completes. If the future fails the stream is failed.
For the corresponding operator for the Java standard library CompletionStage
see completionStageSource.
Example¶
Suppose we are accessing a remote service that streams user data over HTTP/2 or a WebSocket. We can model that as a Source[User,NotUsed]
but that source will only be available once the connection has been established.
source
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import scala.concurrent.Future
object FutureSource {
def sourceCompletionStageSource(): Unit = {
val userRepository: UserRepository = ??? // an abstraction over the remote service
val userFutureSource = Source.futureSource(userRepository.loadUsers)
// ...
}
trait UserRepository {
def loadUsers: Future[Source[User, NotUsed]]
}
case class User()
}
Reactive Streams semantics¶
emits the next value from the future source, once it has completed
completes after the future source completes
1.2.0-M1+35-3d489313*