Using UDP

Dependency

To use UDP, you must add the following dependency in your project:

sbt
val PekkoVersion = "1.1.2+29-e21fa9eb-SNAPSHOT"
libraryDependencies += "org.apache.pekko" %% "pekko-actor" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2+29-e21fa9eb-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2+29-e21fa9eb-SNAPSHOT")

  implementation "org.apache.pekko:pekko-actor_${versions.ScalaBinary}"
}

Introduction

UDP is a connectionless datagram protocol which offers two different ways of communication on the JDK level:

  • sockets which are free to send datagrams to any destination and receive datagrams from any origin
  • sockets which are restricted to communication with one specific remote socket address

In the low-level API the distinction is made—confusingly—by whether or not connect has been called on the socket (even when connect has been called the protocol is still connectionless). These two forms of UDP usage are offered using distinct IO extensions described below.

Unconnected UDP

Simple Send

Scala
sourceclass SimpleSender(remote: InetSocketAddress) extends Actor {
  import context.system
  IO(Udp) ! Udp.SimpleSender

  def receive = {
    case Udp.SimpleSenderReady =>
      context.become(ready(sender()))
  }

  def ready(send: ActorRef): Receive = {
    case msg: String =>
      send ! Udp.Send(ByteString(msg), remote)
  }
}
Java
sourcepublic static class SimpleSender extends AbstractActor {
  final InetSocketAddress remote;

  public SimpleSender(InetSocketAddress remote) {
    this.remote = remote;

    // request creation of a SimpleSender
    final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
    mgr.tell(UdpMessage.simpleSender(), getSelf());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Udp.SimpleSenderReady.class,
            message -> {
              getContext().become(ready(getSender()));
            })
        .build();
  }

  private Receive ready(final ActorRef send) {
    return receiveBuilder()
        .match(
            String.class,
            message -> {
              send.tell(UdpMessage.send(ByteString.fromString(message), remote), getSelf());
            })
        .build();
  }
}

The simplest form of UDP usage is to just send datagrams without the need of getting a reply. To this end a “simple sender” facility is provided as demonstrated above. The UDP extension is queried using the SimpleSenderUdpMessage.simpleSender message, which is answered by a SimpleSenderReady notification. The sender of this message is the newly created sender actor which from this point onward can be used to send datagrams to arbitrary destinations; in this example it will send any UTF-8 encoded String it receives to a predefined remote address.

Note

The simple sender will not shut itself down because it cannot know when you are done with it. You will need to send it a PoisonPill when you want to close the ephemeral port the sender is bound to.

Bind (and Send)

Scala
sourceclass Listener(nextActor: ActorRef) extends Actor {
  import context.system
  IO(Udp) ! Udp.Bind(self, new InetSocketAddress("localhost", 0))

  def receive = {
    case Udp.Bound(local) =>
      context.become(ready(sender()))
  }

  def ready(socket: ActorRef): Receive = {
    case Udp.Received(data, remote) =>
      val processed = // parse data etc., e.g. using PipelineStage
      socket ! Udp.Send(data, remote) // example server echoes back
      nextActor ! processed
    case Udp.Unbind  => socket ! Udp.Unbind
    case Udp.Unbound => context.stop(self)
  }
}
Java
sourcepublic static class Listener extends AbstractActor {
  final ActorRef nextActor;

  public Listener(ActorRef nextActor) {
    this.nextActor = nextActor;

    // request creation of a bound listen socket
    final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
    mgr.tell(UdpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0)), getSelf());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Udp.Bound.class,
            bound -> {
              getContext().become(ready(getSender()));
            })
        .build();
  }

  private Receive ready(final ActorRef socket) {
    return receiveBuilder()
        .match(
            Udp.Received.class,
            r -> {
              // echo server example: send back the data
              socket.tell(UdpMessage.send(r.data(), r.sender()), getSelf());
              // or do some processing and forward it on
              final Object processed = // parse data etc., e.g. using PipelineStage
              nextActor.tell(processed, getSelf());
            })
        .matchEquals(
            UdpMessage.unbind(),
            message -> {
              socket.tell(message, getSelf());
            })
        .match(
            Udp.Unbound.class,
            message -> {
              getContext().stop(getSelf());
            })
        .build();
  }
}

If you want to implement a UDP server which listens on a socket for incoming datagrams then you need to use the BindUdpMessage.bind message as shown above. The local address specified may have a zero port in which case the operating system will automatically choose a free port and assign it to the new socket. Which port was actually bound can be found out by inspecting the Bound message.

The sender of the Bound message is the actor which manages the new socket. Sending datagrams is achieved by using the SendUdpMessage.send message and the socket can be closed by sending a UnbindUdpMessage.unbind message, in which case the socket actor will reply with a Unbound notification.

Received datagrams are sent to the actor designated in the Bind message, whereas the Bound message will be sent to the sender of the BindUdpMessage.bind.

Connected UDP

The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will receive datagrams only from that address.

Scala
sourceclass Connected(remote: InetSocketAddress) extends Actor {
  import context.system
  IO(UdpConnected) ! UdpConnected.Connect(self, remote)

  def receive = {
    case UdpConnected.Connected =>
      context.become(ready(sender()))
  }

  def ready(connection: ActorRef): Receive = {
    case UdpConnected.Received(data) =>
      // process data, send it on, etc.
    case msg: String =>
      connection ! UdpConnected.Send(ByteString(msg))
    case UdpConnected.Disconnect =>
      connection ! UdpConnected.Disconnect
    case UdpConnected.Disconnected => context.stop(self)
  }
}
Java
sourcepublic static class Connected extends AbstractActor {
  final InetSocketAddress remote;

  public Connected(InetSocketAddress remote) {
    this.remote = remote;

    // create a restricted a.k.a. “connected” socket
    final ActorRef mgr = UdpConnected.get(getContext().getSystem()).getManager();
    mgr.tell(UdpConnectedMessage.connect(getSelf(), remote), getSelf());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            UdpConnected.Connected.class,
            message -> {
              getContext().become(ready(getSender()));
            })
        .build();
  }

  private Receive ready(final ActorRef connection) {
    return receiveBuilder()
        .match(
            UdpConnected.Received.class,
            r -> {
              // process data, send it on, etc.
            })
        .match(
            String.class,
            str -> {
              connection.tell(UdpConnectedMessage.send(ByteString.fromString(str)), getSelf());
            })
        .matchEquals(
            UdpConnectedMessage.disconnect(),
            message -> {
              connection.tell(message, getSelf());
            })
        .match(
            UdpConnected.Disconnected.class,
            x -> {
              getContext().stop(getSelf());
            })
        .build();
  }
}

Consequently the example shown here looks quite similar to the previous one, the biggest difference is the absence of remote address information in SendUdpMessage.send and Received messages.

Note

There is a small performance benefit in using connection based UDP API over the connectionless one. If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security check, while in the case of connection-based UDP the security check is cached after connect, thus writes do not suffer an additional performance penalty.

UDP Multicast

Pekko provides a way to control various options of DatagramChannel through the org.apache.pekko.io.Inet.SocketOption interface. The example below shows how to setup a receiver of multicast messages using IPv6 protocol.

To select a Protocol Family you must extend org.apache.pekko.io.Inet.DatagramChannelCreator class which extendsimplements org.apache.pekko.io.Inet.SocketOption. Provide custom logic for opening a datagram channel by overriding create method.

Scala
sourcefinal case class Inet6ProtocolFamily() extends DatagramChannelCreator {
  override def create() =
    DatagramChannel.open(StandardProtocolFamily.INET6)
}
Java
sourcepublic static class Inet6ProtocolFamily extends Inet.DatagramChannelCreator {
  @Override
  public DatagramChannel create() throws Exception {
    return DatagramChannel.open(StandardProtocolFamily.INET6);
  }
}

Another socket option will be needed to join a multicast group.

Scala
sourcefinal case class MulticastGroup(address: String, interface: String) extends SocketOptionV2 {
  override def afterBind(s: DatagramSocket): Unit = {
    val group = InetAddress.getByName(address)
    val networkInterface = NetworkInterface.getByName(interface)
    s.getChannel.join(group, networkInterface)
  }
}
Java
sourcepublic static class MulticastGroup extends Inet.AbstractSocketOptionV2 {
  private String address;
  private String interf;

  public MulticastGroup(String address, String interf) {
    this.address = address;
    this.interf = interf;
  }

  @Override
  public void afterBind(DatagramSocket s) {
    try {
      InetAddress group = InetAddress.getByName(address);
      NetworkInterface networkInterface = NetworkInterface.getByName(interf);
      s.getChannel().join(group, networkInterface);
    } catch (Exception ex) {
      System.out.println("Unable to join multicast group.");
    }
  }
}

Socket options must be provided to UdpMessage.BindUdpMessage.bind message.

Scala
sourceimport context.system
val opts = List(Inet6ProtocolFamily(), MulticastGroup(group, iface))
IO(Udp) ! Udp.Bind(self, new InetSocketAddress(port), opts)
Java
sourceList<Inet.SocketOption> options = new ArrayList<>();
options.add(new Inet6ProtocolFamily());
options.add(new MulticastGroup(group, iface));

final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
// listen for datagrams on this address
InetSocketAddress endpoint = new InetSocketAddress(port);
mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf());