handleWebSocketMessages

Signature

def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route 

Description

The directive first checks if the request was a valid WebSocket handshake request and if yes, it completes the request with the passed handler. Otherwise, the request is rejected with an ExpectedWebSocketRequestRejectionExpectedWebSocketRequestRejection.

WebSocket subprotocols offered in the Sec-WebSocket-Protocol header of the request are ignored. If you want to support several protocols use the handleWebSocketMessagesForProtocol directive, instead.

For more information about the WebSocket support, see Server-Side WebSocket Support.

Example

Scala
sourcedef greeter: Flow[Message, Message, Any] =
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(Source.single("Hello ") ++ tm.textStream ++ Source.single("!")) :: Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }
val websocketRoute =
  path("greeter") {
    handleWebSocketMessages(greeter)
  }

// tests:
// create a testing probe representing the client-side
val wsClient = WSProbe()

// WS creates a WebSocket request for testing
WS("/greeter", wsClient.flow) ~> websocketRoute ~>
check {
  // check response for WS Upgrade headers
  isWebSocketUpgrade shouldEqual true

  // manually run a WS conversation
  wsClient.sendMessage("Peter")
  wsClient.expectMessage("Hello Peter!")

  wsClient.sendMessage(BinaryMessage(ByteString("abcdef")))
  wsClient.expectNoMessage(100.millis)

  wsClient.sendMessage("John")
  wsClient.expectMessage("Hello John!")

  wsClient.sendCompletion()
  wsClient.expectCompletion()
}
Java
sourceimport static org.apache.pekko.http.javadsl.server.Directives.path;
import static org.apache.pekko.http.javadsl.server.Directives.handleWebSocketMessages;

final Flow<Message, Message, NotUsed> greeter =
    Flow.of(Message.class)
        .mapConcat(
            msg -> {
              if (msg instanceof TextMessage) {
                final TextMessage tm = (TextMessage) msg;
                final TextMessage ret =
                    TextMessage.create(
                        Source.single("Hello ")
                            .concat(tm.getStreamedText())
                            .concat(Source.single("!")));
                return Collections.singletonList(ret);
              } else if (msg instanceof BinaryMessage) {
                final BinaryMessage bm = (BinaryMessage) msg;
                bm.getStreamedData().runWith(Sink.ignore(), materializer());
                return Collections.emptyList();
              } else {
                throw new IllegalArgumentException("Unsupported message type!");
              }
            });

final Route websocketRoute = path("greeter", () -> handleWebSocketMessages(greeter));

// create a testing probe representing the client-side
final WSProbe wsClient = WSProbe.create(system(), materializer());

// WS creates a WebSocket request for testing
testRoute(websocketRoute)
    .run(WS(Uri.create("/greeter"), wsClient.flow(), materializer()))
    .assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS);

// manually run a WS conversation
wsClient.sendMessage("Peter");
wsClient.expectMessage("Hello Peter!");

wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));

wsClient.sendMessage("John");
wsClient.expectMessage("Hello John!");

wsClient.sendCompletion();
wsClient.expectCompletion();