Futures interop
Dependency
To use Pekko Streams, add the module to your project:
- sbt
val PekkoVersion = "1.0.1" libraryDependencies += "org.apache.pekko" %% "pekko-stream" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.0.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.0.1") implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}" }
Overview
Stream transformations and side effects involving external non-stream based services can be performed with mapAsync
or mapAsyncUnordered
.
For example, sending emails to the authors of selected tweets using an external email service:
- Scala
-
source
def send(email: Email): Future[Unit] = { // ... }
- Java
-
source
public CompletionStage<Email> send(Email email) { // ... }
We start with the tweet stream of authors:
- Scala
-
source
val authors: Source[Author, NotUsed] = tweets.filter(_.hashtags.contains(pekkoTag)).map(_.author)
- Java
-
source
final Source<Author, NotUsed> authors = tweets.filter(t -> t.hashtags().contains(PEKKO)).map(t -> t.author);
Assume that we can look up their email address using:
- Scala
-
source
def lookupEmail(handle: String): Future[Option[String]] =
- Java
-
source
public CompletionStage<Optional<String>> lookupEmail(String handle)
Transforming the stream of authors to a stream of email addresses by using the lookupEmail
service can be done with mapAsync
:
- Scala
-
source
val emailAddresses: Source[String, NotUsed] = authors.mapAsync(4)(author => addressSystem.lookupEmail(author.handle)).collect { case Some(emailAddress) => emailAddress }
- Java
-
source
final Source<String, NotUsed> emailAddresses = authors .mapAsync(4, author -> addressSystem.lookupEmail(author.handle)) .filter(o -> o.isPresent()) .map(o -> o.get());
Finally, sending the emails:
- Scala
-
source
val sendEmails: RunnableGraph[NotUsed] = emailAddresses .mapAsync(4)(address => { emailServer.send(Email(to = address, title = "pekko", body = "I like your tweet")) }) .to(Sink.ignore) sendEmails.run()
- Java
-
source
final RunnableGraph<NotUsed> sendEmails = emailAddresses .mapAsync( 4, address -> emailServer.send(new Email(address, "PEKKO", "I like your tweet"))) .to(Sink.ignore()); sendEmails.run(system);
mapAsync
is applying the given function that is calling out to the external service to each of the elements as they pass through this processing step. The function returns a Future
CompletionStage
and the value of that future will be emitted downstream. The number of Futures that shall run in parallel is given as the first argument to mapAsync
. These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.
That means that back-pressure works as expected. For example if the emailServer.send
is the bottleneck it will limit the rate at which incoming tweets are retrieved and email addresses looked up.
The final piece of this pipeline is to generate the demand that pulls the tweet authors information through the emailing pipeline: we attach a Sink.ignore
which makes it all run. If our email process would return some interesting data for further transformation then we would not ignore it but send that result stream onwards for further processing or storage.
Note that mapAsync
preserves the order of the stream elements. In this example the order is not important and then we can use the more efficient mapAsyncUnordered
:
- Scala
-
source
val authors: Source[Author, NotUsed] = tweets.filter(_.hashtags.contains(pekkoTag)).map(_.author) val emailAddresses: Source[String, NotUsed] = authors.mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle)).collect { case Some(emailAddress) => emailAddress } val sendEmails: RunnableGraph[NotUsed] = emailAddresses .mapAsyncUnordered(4)(address => { emailServer.send(Email(to = address, title = "Pekko", body = "I like your tweet")) }) .to(Sink.ignore) sendEmails.run()
- Java
-
source
final Source<Author, NotUsed> authors = tweets.filter(t -> t.hashtags().contains(PEKKO)).map(t -> t.author); final Source<String, NotUsed> emailAddresses = authors .mapAsyncUnordered(4, author -> addressSystem.lookupEmail(author.handle)) .filter(o -> o.isPresent()) .map(o -> o.get()); final RunnableGraph<NotUsed> sendEmails = emailAddresses .mapAsyncUnordered( 4, address -> emailServer.send(new Email(address, "Pekko", "I like your tweet"))) .to(Sink.ignore()); sendEmails.run(system);
In the above example the services conveniently returned a Future
CompletionStage
of the result. If that is not the case you need to wrap the call in a Future
CompletionStage
. If the service call involves blocking you must also make sure that you run it on a dedicated execution context, to avoid starvation and disturbance of other tasks in the system.
- Scala
-
source
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") val sendTextMessages: RunnableGraph[NotUsed] = phoneNumbers .mapAsync(4)(phoneNo => { Future { smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet")) }(blockingExecutionContext) }) .to(Sink.ignore) sendTextMessages.run()
- Java
-
source
final Executor blockingEc = system.dispatchers().lookup("blocking-dispatcher"); final RunnableGraph<NotUsed> sendTextMessages = phoneNumbers .mapAsync( 4, phoneNo -> CompletableFuture.supplyAsync( () -> smsServer.send(new TextMessage(phoneNo, "I like your tweet")), blockingEc)) .to(Sink.ignore()); sendTextMessages.run(system);
The configuration of the "blocking-dispatcher"
may look something like:
sourceblocking-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 10
core-pool-size-max = 10
}
}
An alternative for blocking calls is to perform them in a map
operation, still using a dedicated dispatcher for that operation.
- Scala
-
source
val send = Flow[String] .map { phoneNo => smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet")) } .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher")) val sendTextMessages: RunnableGraph[NotUsed] = phoneNumbers.via(send).to(Sink.ignore) sendTextMessages.run()
- Java
-
source
final Flow<String, Boolean, NotUsed> send = Flow.of(String.class) .map(phoneNo -> smsServer.send(new TextMessage(phoneNo, "I like your tweet"))) .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher")); final RunnableGraph<?> sendTextMessages = phoneNumbers.via(send).to(Sink.ignore()); sendTextMessages.run(system);
However, that is not exactly the same as mapAsync
, since the mapAsync
may run several calls concurrently, but map
performs them one at a time.
For a service that is exposed as an actor, or if an actor is used as a gateway in front of an external service, you can use ask
:
- Scala
-
source
import org.apache.pekko.pattern.ask val PekkoTweets: Source[Tweet, NotUsed] = tweets.filter(_.hashtags.contains(pekkoTag)) implicit val timeout: Timeout = 3.seconds val saveTweets: RunnableGraph[NotUsed] = PekkoTweets.mapAsync(4)(tweet => database ? Save(tweet)).to(Sink.ignore)
- Java
-
source
final Source<Tweet, NotUsed> pekkoTweets = tweets.filter(t -> t.hashtags().contains(PEKKO)); final RunnableGraph<NotUsed> saveTweets = pekkoTweets .mapAsync(4, tweet -> ask(database, new Save(tweet), Duration.ofMillis(300L))) .to(Sink.ignore());
Note that if the ask
is not completed within the given timeout the stream is completed with failure. If that is not desired outcome you can use recover
on the ask
Future
CompletionStage
.
Illustrating ordering and parallelism
Let us look at another example to get a better understanding of the ordering and parallelism characteristics of mapAsync
and mapAsyncUnordered
.
Several mapAsync
and mapAsyncUnordered
futures may run concurrently. The number of concurrent futures are limited by the downstream demand. For example, if 5 elements have been requested by downstream there will be at most 5 futures in progress.
mapAsync
emits the future results in the same order as the input elements were received. That means that completed results are only emitted downstream when earlier results have been completed and emitted. One slow call will thereby delay the results of all successive calls, even though they are completed before the slow call.
mapAsyncUnordered
emits the future results as soon as they are completed, i.e. it is possible that the elements are not emitted downstream in the same order as received from upstream. One slow call will thereby not delay the results of faster successive calls as long as there is downstream demand of several elements.
Here is a fictive service that we can use to illustrate these aspects.
- Scala
-
source
class SometimesSlowService(implicit ec: ExecutionContext) { private val runningCount = new AtomicInteger def convert(s: String): Future[String] = { println(s"running: $s (${runningCount.incrementAndGet()})") Future { if (s.nonEmpty && s.head.isLower) Thread.sleep(500) else Thread.sleep(20) println(s"completed: $s (${runningCount.decrementAndGet()})") s.toUpperCase } } }
- Java
-
source
static class SometimesSlowService { private final Executor ec; public SometimesSlowService(Executor ec) { this.ec = ec; } private final AtomicInteger runningCount = new AtomicInteger(); public CompletionStage<String> convert(String s) { System.out.println("running: " + s + "(" + runningCount.incrementAndGet() + ")"); return CompletableFuture.supplyAsync( () -> { if (!s.isEmpty() && Character.isLowerCase(s.charAt(0))) try { Thread.sleep(500); } catch (InterruptedException e) { } else try { Thread.sleep(20); } catch (InterruptedException e) { } System.out.println("completed: " + s + "(" + runningCount.decrementAndGet() + ")"); return s.toUpperCase(); }, ec); } }
Elements starting with a lower case character are simulated to take longer time to process.
Here is how we can use it with mapAsync
:
- Scala
-
source
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") val service = new SometimesSlowService Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map(elem => { println(s"before: $elem"); elem }) .mapAsync(4)(service.convert) .to(Sink.foreach(elem => println(s"after: $elem"))) .withAttributes(Attributes.inputBuffer(initial = 4, max = 4)) .run()
- Java
-
source
final Executor blockingEc = system.dispatchers().lookup("blocking-dispatcher"); final SometimesSlowService service = new SometimesSlowService(blockingEc); Source.from(Arrays.asList("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map( elem -> { System.out.println("before: " + elem); return elem; }) .mapAsync(4, service::convert) .to(Sink.foreach(elem -> System.out.println("after: " + elem))) .withAttributes(Attributes.inputBuffer(4, 4)) .run(system);
The output may look like this:
before: a
before: B
before: C
before: D
running: a (1)
running: B (2)
before: e
running: C (3)
before: F
running: D (4)
before: g
before: H
completed: C (3)
completed: B (2)
completed: D (1)
completed: a (0)
after: A
after: B
running: e (1)
after: C
after: D
running: F (2)
before: i
before: J
running: g (3)
running: H (4)
completed: H (2)
completed: F (3)
completed: e (1)
completed: g (0)
after: E
after: F
running: i (1)
after: G
after: H
running: J (2)
completed: J (1)
completed: i (0)
after: I
after: J
Note that after
lines are in the same order as the before
lines even though elements are completed
in a different order. For example H
is completed
before g
, but still emitted afterwards.
The numbers in parentheses illustrate how many calls that are in progress at the same time. Here the downstream demand and thereby the number of concurrent calls are limited by the buffer size (4) set with an attribute.
Here is how we can use the same service with mapAsyncUnordered
:
- Scala
-
source
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") val service = new SometimesSlowService Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map(elem => { println(s"before: $elem"); elem }) .mapAsyncUnordered(4)(service.convert) .to(Sink.foreach(elem => println(s"after: $elem"))) .withAttributes(Attributes.inputBuffer(initial = 4, max = 4)) .run()
- Java
-
source
final Executor blockingEc = system.dispatchers().lookup("blocking-dispatcher"); final SometimesSlowService service = new SometimesSlowService(blockingEc); Source.from(Arrays.asList("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map( elem -> { System.out.println("before: " + elem); return elem; }) .mapAsyncUnordered(4, service::convert) .to(Sink.foreach(elem -> System.out.println("after: " + elem))) .withAttributes(Attributes.inputBuffer(4, 4)) .run(system);
The output may look like this:
before: a
before: B
before: C
before: D
running: a (1)
running: B (2)
before: e
running: C (3)
before: F
running: D (4)
before: g
before: H
completed: B (3)
completed: C (1)
completed: D (2)
after: B
after: D
running: e (2)
after: C
running: F (3)
before: i
before: J
completed: F (2)
after: F
running: g (3)
running: H (4)
completed: H (3)
after: H
completed: a (2)
after: A
running: i (3)
running: J (4)
completed: J (3)
after: J
completed: e (2)
after: E
completed: g (1)
after: G
completed: i (0)
after: I
Note that after
lines are not in the same order as the before
lines. For example H
overtakes the slow G
.
The numbers in parentheses illustrate how many calls that are in progress at the same time. Here the downstream demand and thereby the number of concurrent calls are limited by the buffer size (4) set with an attribute.