Server WebSocket Support

WebSocket is a protocol that provides a bi-directional channel between browser and webserver usually run over an upgraded HTTP(S) connection. Data is exchanged in messages whereby a message can either be binary data or Unicode text.

Apache Pekko HTTP provides a stream-based implementation of the WebSocket protocol that hides the low-level details of the underlying binary framing wire-protocol and provides a simple API to implement services using WebSocket.

Model

The basic unit of data exchange in the WebSocket protocol is a message. A message can either be binary message, i.e. a sequence of octets or a text message, i.e. a sequence of Unicode code points.

In the data model the two kinds of messages, binary and text messages, are represented by the two classes BinaryMessageBinaryMessage and TextMessageTextMessage deriving from a common superclass MessageMessage. The subclasses BinaryMessageBinaryMessage and TextMessageTextMessage contain methods to access the data. The superclass Message contains isText and isBinary methods to distinguish a message and asBinaryMessage and asTextMessage methods to cast a message.] Take the API of TextMessageTextMessage as an example (BinaryMessageBinaryMessage is very similar with String replaced by ByteStringByteString):

Scala
source/**
 * The ADT for WebSocket messages. A message can either be a binary or a text message.
 */
sealed trait Message extends pekko.http.javadsl.model.ws.Message

/**
 * Represents a WebSocket text message. A text message can either be a [[TextMessage.Strict]] in which case
 * the complete data is already available or it can be [[TextMessage.Streamed]] in which case `textStream`
 * will return a Source streaming the data as it comes in.
 */
sealed trait TextMessage extends pekko.http.javadsl.model.ws.TextMessage with Message {

  /**
   * The contents of this message as a stream.
   */
  def textStream: Source[String, _]

  /**
   * Collects all possible parts and returns a potentially future Strict Message for easier processing.
   * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout.
   */
  def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[TextMessage.Strict] =
    this match {
      case TextMessage.Strict(text) => Future.successful(TextMessage.Strict(text))
      case TextMessage.Streamed(textStream) => textStream
          .completionTimeout(timeout)
          .runFold(new StringBuilder())((b, s) => b.append(s))
          .map(b => b.toString)(fm.executionContext)
          .map(text => TextMessage.Strict(text))(fm.executionContext)
    }

  /** Java API */
  override def getStreamedText: javadsl.Source[String, _] = textStream.asJava
  override def asScala: TextMessage = this
  override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[TextMessage.Strict] =
    toStrict(timeoutMillis.millis)(materializer).asJava
}
sealed trait BinaryMessage extends pekko.http.javadsl.model.ws.BinaryMessage with Message {

  /**
   * The contents of this message as a stream.
   */
  def dataStream: Source[ByteString, _]

  /**
   * Collects all possible parts and returns a potentially future Strict Message for easier processing.
   * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout.
   */
  def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[BinaryMessage.Strict] =
    this match {
      case BinaryMessage.Strict(binary) => Future.successful(BinaryMessage.Strict(binary))
      case BinaryMessage.Streamed(binaryStream) => binaryStream
          .completionTimeout(timeout)
          .runFold(new ByteStringBuilder())((b, e) => b.append(e))
          .map(b => b.result())(fm.executionContext)
          .map(binary => BinaryMessage.Strict(binary))(fm.executionContext)
    }

  /** Java API */
  override def getStreamedData: javadsl.Source[ByteString, _] = dataStream.asJava
  override def asScala: BinaryMessage = this
  override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[BinaryMessage.Strict] =
    toStrict(timeoutMillis.millis)(materializer).asJava
}
Java
sourceabstract class TextMessage extends Message {

  /**
   * Returns a source of the text message data.
   */
  def getStreamedText: Source[String, _]

  /**
   * Returns the strict message text if this message is strict, throws otherwise.
   */
  def getStrictText: String
}

The data of a message is provided as a stream because WebSocket messages do not have a predefined size and could (in theory) be infinitely long. However, only one message can be open per direction of the WebSocket connection, so that many application level protocols will want to make use of the delineation into (small) messages to transport single application-level data units like “one event” or “one chat message”.

Many messages are small enough to be sent or received in one go. As an opportunity for optimization, the model provides the notion of a “strict” message to represent cases where a whole message was received in one go. Strict messages are represented with the Strict subclass for each kind of message which contains data as a strict, i.e. non-streamed, ByteStringByteString or String. If TextMessage.isStrict returns true, the complete data is already available and can be accessed with TextMessage.getStrictText (analogously for BinaryMessageBinaryMessage).

When receiving data from the network connection the WebSocket implementation tries to create a strict message whenever possible, i.e. when the complete data was received in one chunk. However, the actual chunking of messages over a network connection and through the various streaming abstraction layers is not deterministic from the perspective of the application. Therefore, application code must be able to handle both streamed and strict messages and not expect certain messages to be strict. (Particularly, note that tests against localhost will behave differently than tests against remote peers where data is received over a physical network connection.)

For sending data, you can use TextMessage.apply(text: String)TextMessage.create(String) to create a strict message if the complete message has already been assembled. Otherwise, use TextMessage.apply(textStream: Source[String, \_])TextMessage.create(Source<String, ?>) to create a streaming message from an Apache Pekko Stream source.

Routing support

To handle websocket requests, you can either use the directive described in this section, or use the more low-level WebSocketUpgrade attribute described in the next section.

The routing DSL provides the handleWebSocketMessages directive to install a WebSocket handler if a request is a WebSocket request. Otherwise, the directive rejects the request.

Let’s look at how the above example can be rewritten using the high-level routing DSL.

Instead of writing the request handler manually, the routing behavior of the app is defined by a route that uses the handleWebSocketMessages directive:

Scala
sourcedef greeter: Flow[Message, Message, Any] =
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(Source.single("Hello ") ++ tm.textStream ++ Source.single("!")) :: Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }
val websocketRoute =
  path("greeter") {
    handleWebSocketMessages(greeter)
  }

// tests:
// create a testing probe representing the client-side
val wsClient = WSProbe()

// WS creates a WebSocket request for testing
WS("/greeter", wsClient.flow) ~> websocketRoute ~>
check {
  // check response for WS Upgrade headers
  isWebSocketUpgrade shouldEqual true

  // manually run a WS conversation
  wsClient.sendMessage("Peter")
  wsClient.expectMessage("Hello Peter!")

  wsClient.sendMessage(BinaryMessage(ByteString("abcdef")))
  wsClient.expectNoMessage(100.millis)

  wsClient.sendMessage("John")
  wsClient.expectMessage("Hello John!")

  wsClient.sendCompletion()
  wsClient.expectCompletion()
}
Java
sourcepublic Route createRoute() {
  return path("greeter", () -> handleWebSocketMessages(greeter()));
}

The handling code itself will be the same as with using the low-level API.

The example also includes code demonstrating the testkit support for WebSocket services. It allows to create WebSocket requests to run against a route using WS which can be used to provide a mock WebSocket probe that allows manual testing of the WebSocket handler’s behavior if the request was accepted.

See the full routing example.

WebSocketUpgrade

To handle websocket requests, you can either use the WebSocketUpgradeWebSocketUpgrade attribute directly, or use the more high-level Routing Support described above.

The entrypoint for the WebSocket API is the WebSocketUpgradeWebSocketUpgrade attribute which is added to a request if Apache Pekko HTTP encounters a WebSocket upgrade request.

The WebSocket specification mandates that details of the WebSocket connection are negotiated by placing special-purpose HTTP headers into request and response of the HTTP upgrade. In Apache Pekko HTTP these HTTP-level details of the WebSocket handshake are hidden from the application and don’t need to be managed manually.

Instead, the WebSocketUpgradeWebSocketUpgrade attribute represents a valid WebSocket upgrade request. An application can detect a WebSocket upgrade request by looking for the WebSocketUpgradeWebSocketUpgrade attribute. It can choose to accept the upgrade and start a WebSocket connection by responding to that request with an HttpResponseHttpResponse generated by one of the WebSocketUpgradeWebSocketUpgrade.handleMessagesWith methods. In its most general form this method expects two arguments: first, a handler Flow<Message, Message, Any>Flow[Message, Message, Any]Flow<Message, Message, ?>Flow[Message, Message, ?] that will be used to handle WebSocket messages on this connection. Second, the application can optionally choose one of the proposed application-level sub-protocols by inspecting the values of WebSocketUpgrade.requestedProtocolsWebSocketUpgrade.getRequestedProtocols and pass the chosen protocol value to handleMessageshandleMessagesWith.

Handling Messages

A message handler is expected to be implemented as a Flow<Message, Message, Any>Flow[Message, Message, Any]Flow<Message, Message, ?>Flow[Message, Message, ?]. For typical request-response scenarios this fits very well and such a FlowFlow can be constructed from a simple function by using Flow[Message].map or Flow[Message].mapAsyncFlow.<Message>create().map or Flow.<Message>create().mapAsync.

There are other use-cases, e.g. in a server-push model, where a server message is sent spontaneously, or in a true bi-directional scenario where input and output aren’t logically connected. Providing the handler as a FlowFlow in these cases may not fit. Another method named WebSocketUpgrade.handleMessagesWithSinkSourceAn overload of WebSocketUpgrade.handleMessagesWith is provided, instead, which allows to pass an output-generating Source<Message, ?>Source[Message, _] and an input-receiving Sink<Message, ?>Sink[Message, _] independently.

Note that a handler is required to consume the data stream of each message to make place for new messages. Otherwise, subsequent messages may be stuck and message traffic in this direction will stall.

Example

Let’s look at an exampleexample.

WebSocket requests come in like any other requests. In the example, requests to /greeter are expected to be WebSocket requests:

Scala
sourceval requestHandler: HttpRequest => HttpResponse = {
  case req @ HttpRequest(GET, Uri.Path("/greeter"), _, _, _) =>
    req.attribute(AttributeKeys.webSocketUpgrade) match {
      case Some(upgrade) => upgrade.handleMessages(greeterWebSocketService)
      case None          => HttpResponse(400, entity = "Not a valid websocket request!")
    }
  case r: HttpRequest =>
    r.discardEntityBytes() // important to drain incoming HTTP Entity stream
    HttpResponse(404, entity = "Unknown resource!")
}
Java
sourcepublic static HttpResponse handleRequest(HttpRequest request) {
  System.out.println("Handling request to " + request.getUri());

  if (request.getUri().path().equals("/greeter")) {
    return request
        .getAttribute(AttributeKeys.webSocketUpgrade())
        .map(
            upgrade -> {
              Flow<Message, Message, NotUsed> greeterFlow = greeter();
              HttpResponse response = upgrade.handleMessagesWith(greeterFlow);
              return response;
            })
        .orElse(
            HttpResponse.create()
                .withStatus(StatusCodes.BAD_REQUEST)
                .withEntity("Expected WebSocket request"));
  } else {
    return HttpResponse.create().withStatus(404);
  }
}

It uses pattern matching onlooks at the path and then inspects the request to query for the WebSocketUpgradeWebSocketUpgrade attribute. If such an attribute is found, it is used to generate a response by passing a handler for WebSocket messages to the handleMessageshandleMessagesWith method. If no such attribute is found a 400 Bad Request response is generated.

You can also use the WebSocket.handleWebSocketRequestWith helper method which can be used if only WebSocket requests are expected. The method looks for the WebSocketUpgradeWebSocketUpgrade attribute and returns a response that will install the passed WebSocket handler if the header is found. If the request is no WebSocket request it will return a 400 Bad Request error response.

In the example, the passed handler expects text messages where each message is expected to contain a (person’s) name and then responds with another text message that contains a greeting:

Scala
source// The Greeter WebSocket Service expects a "name" per message and
// returns a greeting message for that name
val greeterWebSocketService =
  Flow[Message]
    .mapConcat {
      // we match but don't actually consume the text message here,
      // rather we simply stream it back as the tail of the response
      // this means we might start sending the response even before the
      // end of the incoming message has been received
      case tm: TextMessage   => TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil
      case bm: BinaryMessage =>
        // ignore binary messages but drain content to avoid the stream being clogged
        bm.dataStream.runWith(Sink.ignore)
        Nil
    }
Java
source
/** * A handler that treats incoming messages as a name, and responds with a greeting to that name */ public static Flow<Message, Message, NotUsed> greeter() { return Flow.<Message>create() .collect( new JavaPartialFunction<Message, Message>() { @Override public Message apply(Message msg, boolean isCheck) throws Exception { if (isCheck) { if (msg.isText()) { return null; } else { throw noMatch(); } } else { return handleTextMessage(msg.asTextMessage()); } } }); } public static TextMessage handleTextMessage(TextMessage msg) { if (msg.isStrict()) // optimization that directly creates a simple response... { return TextMessage.create("Hello " + msg.getStrictText()); } else // ... this would suffice to handle all text messages in a streaming fashion { return TextMessage.create(Source.single("Hello ").concat(msg.getStreamedText())); } }
Note

Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

Automatic keep-alive Ping support

For long running websocket connections it may be beneficial to enable automatic heartbeat using Ping frames. Those are often used as a way to keep otherwise idle connections from being closed and also a way of ensuring the connection remains usable even after no data frames are communicated over a longer period of time. Such heartbeat may be initiated by either side of the connection, and the choice which side performs the heart beating is use-case dependent.

This is supported in a transparent way via configuration in Apache Pekko HTTP, and you can enable it by setting the: pekko.http.server.websocket.periodic-keep-alive-max-idle = 1 second to a specified max idle timeout. The keep alive triggers when no other messages are in-flight during the such configured period. Apache Pekko HTTP will then automatically send a Ping frame for each of such idle intervals.

By default, the automatic keep-alive feature is disabled.

Custom keep-alive data payloads

By default, pings do not carry any payload, as it is often enough to simply push any frame over the connection to ensure the connection stays healthy (or detect if it was severed), however you may configure them to carry a custom payload, to do this you can provide a function that will be asked to emit the payload for each of the ping messages generated:

Scala
sourceval defaultSettings = ServerSettings(system)

val pingCounter = new AtomicInteger()

Http().newServerAt("127.0.0.1", 0)
  .adaptSettings(_.mapWebsocketSettings(
    _.withPeriodicKeepAliveData(() => ByteString(s"debug-${pingCounter.incrementAndGet()}"))))
  .bind(route)
Java
sourceServerSettings defaultSettings = ServerSettings.create(system);

AtomicInteger pingCounter = new AtomicInteger();

WebSocketSettings customWebsocketSettings =
    defaultSettings
        .getWebsocketSettings()
        .withPeriodicKeepAliveData(
            () ->
                ByteString.fromString(
                    String.format("debug-%d", pingCounter.incrementAndGet())));

ServerSettings customServerSettings =
    defaultSettings.withWebsocketSettings(customWebsocketSettings);

Http http = Http.get(system);
http.newServerAt("127.0.0.1", 8080).withSettings(customServerSettings).bindFlow(handler);

Uni-directional Pong keep-alive

A Ping response will always be replied to by the client-side with an appropriate Pong reply, carrying the same payload. It is also possible to configure the keep-alive mechanism to send Pong frames instead of Ping frames, which enables an uni-directional heartbeat mechanism (in which case the client side will not reply to such heartbeat). You can configure this mode by setting: pekko.http.server.websocket.periodic-keep-alive-mode = pong.