Classic Dispatchers
Pekko Classic pertains to the original Actor APIs, which have been improved by more type safe and guided Actor APIs. Pekko Classic is still fully supported and existing applications can continue to use the classic APIs. It is also possible to use the new Actor APIs together with classic actors in the same ActorSystem, see coexistence. For new projects we recommend using the new Actor API.
For the full documentation of this feature and for new projects see Dispatchers.
Dependency
Dispatchers are part of core Pekko, which means that they are part of the pekko-actor dependency:
- sbt
val PekkoVersion = "1.1.2+29-e21fa9eb-SNAPSHOT" libraryDependencies += "org.apache.pekko" %% "pekko-actor" % PekkoVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-bom_${scala.binary.version}</artifactId> <version>1.1.2+29-e21fa9eb-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-actor_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+29-e21fa9eb-SNAPSHOT") implementation "org.apache.pekko:pekko-actor_${versions.ScalaBinary}" }
Looking up a Dispatcher
Dispatchers implement the ExecutionContext
Executor
interface and can thus be used to run Future
CompletableFuture
invocations etc.
- Scala
-
source
// for use with Futures, Scheduler, etc. implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
- Java
-
source
// this is scala.concurrent.ExecutionContextExecutor, which implements // both scala.concurrent.ExecutionContext (for use with Futures, Scheduler, etc.) // and java.util.concurrent.Executor (for use with CompletableFuture etc.) final ExecutionContextExecutor ex = system.dispatchers().lookup("my-dispatcher");
Setting the dispatcher for an Actor
So in case you want to give your Actor
Actor
a different dispatcher than the default, you need to do two things, of which the first is to configure the dispatcher:
sourcemy-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
Note that the parallelism-max
does not set the upper bound on the total number of threads allocated by the ForkJoinPool. It is a setting specifically talking about the number of hot threads the pool keep running in order to reduce the latency of handling a new incoming task. You can read more about parallelism in the JDK’s ForkJoinPool documentation.
Another example that uses the “thread-pool-executor”:
sourceblocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
The thread pool executor dispatcher is implemented using a java.util.concurrent.ThreadPoolExecutor
. You can read more about it in the JDK’s ThreadPoolExecutor documentation.
For more options, see Dispatchers and the default-dispatcher
section of the configuration.
Then you create the actor as usual and define the dispatcher in the deployment configuration.
- Scala
-
source
import org.apache.pekko.actor.Props val myActor = context.actorOf(Props[MyActor](), "myactor")
- Java
-
source
ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor");
sourcepekko.actor.deployment {
/myactor {
dispatcher = my-dispatcher
}
}
An alternative to the deployment configuration is to define the dispatcher in code. If you define the dispatcher
in the deployment configuration then this value will be used instead of programmatically provided parameter.
- Scala
-
source
import org.apache.pekko.actor.Props val myActor = context.actorOf(Props[MyActor]().withDispatcher("my-dispatcher"), "myactor1")
- Java
-
source
ActorRef myActor = system.actorOf(Props.create(MyActor.class).withDispatcher("my-dispatcher"), "myactor3");
The dispatcher you specify in withDispatcher
withDispatcher
and the dispatcher
property in the deployment configuration is in fact a path into your configuration. So in this example it’s a top-level section, but you could for instance put it as a sub-section, where you’d use periods to denote sub-sections, like this: "foo.bar.my-dispatcher"