UDP

The UDP connector provides Apache Pekko Stream flows that allow for sending and receiving UDP datagrams.

Project Info: Apache Pekko Connectors UDP
Artifact
org.apache.pekko
pekko-connectors-udp
1.1.0-M1+129-4a175fef-SNAPSHOT
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.udp
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.1.2"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-udp" % "1.1.0-M1+129-4a175fef-SNAPSHOT",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.1.2</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-udp_${scala.binary.version}</artifactId>
    <version>1.1.0-M1+129-4a175fef-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.1.2",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-udp_${versions.ScalaBinary}:1.1.0-M1+129-4a175fef-SNAPSHOT"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Sending

Datagrams can be sent to remote destinations by using a Udp.sendFlow or Udp.sendSink which can be found in the UdpUdp factory object.

Scala
source  val destination = new InetSocketAddress("my.server", 27015)
val messagesToSend = 100

Source(1 to messagesToSend)
  .map(i => ByteString(s"Message $i"))
  .map(Datagram(_, destination))
  .runWith(Udp.sendSink())
Java
source  final InetSocketAddress destination = new InetSocketAddress("my.server", 27015);
final Integer messagesToSend = 100;

Source.range(1, messagesToSend)
    .map(i -> ByteString.fromString("Message " + i))
    .map(bs -> Datagram.create(bs, destination))
    .runWith(Udp.sendSink(system), system);

Receiving

First create an address which will be used to bind and listen for incoming datagrams.

Scala
sourceval bindToLocal = new InetSocketAddress("localhost", 0)
Java
sourcefinal InetSocketAddress bindToLocal = new InetSocketAddress("localhost", 0);

A Flow created from Udp.bindFlow will bind to the given address. All datagrams coming from the network to the bound address will be sent downstream. Datagrams received from the upstream will be sent to their corresponding destination addresses.

The flow materializes to the Future[InetSocketAddress]CompletionStage<InetSocketAddress> which will eventually hold the address the flow was finally bound to.

Scala
sourceval bindFlow: Flow[Datagram, Datagram, Future[InetSocketAddress]] =
  Udp.bindFlow(bindToLocal)
Java
sourcefinal Flow<Datagram, Datagram, CompletionStage<InetSocketAddress>> bindFlow =
    Udp.bindFlow(bindToLocal, system);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to browse the code, edit and run it in sbt.

Scala
sbt
> udp/testOnly *.UdpSpec
Java
sbt
> udp/testOnly *.UdpTest