handleWebSocketMessages
Description
The directive first checks if the request was a valid WebSocket handshake request and if yes, it completes the request with the passed handler. Otherwise, the request is rejected with an ExpectedWebSocketRequestRejection
ExpectedWebSocketRequestRejection
.
WebSocket subprotocols offered in the Sec-WebSocket-Protocol
header of the request are ignored. If you want to support several protocols use the handleWebSocketMessagesForProtocol directive, instead.
For more information about the WebSocket support, see Server-Side WebSocket Support.
Example
- Scala
-
source
def greeter: Flow[Message, Message, Any] = Flow[Message].mapConcat { case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream ++ Source.single("!")) :: Nil case bm: BinaryMessage => // ignore binary messages but drain content to avoid the stream being clogged bm.dataStream.runWith(Sink.ignore) Nil } val websocketRoute = path("greeter") { handleWebSocketMessages(greeter) } // tests: // create a testing probe representing the client-side val wsClient = WSProbe() // WS creates a WebSocket request for testing WS("/greeter", wsClient.flow) ~> websocketRoute ~> check { // check response for WS Upgrade headers isWebSocketUpgrade shouldEqual true // manually run a WS conversation wsClient.sendMessage("Peter") wsClient.expectMessage("Hello Peter!") wsClient.sendMessage(BinaryMessage(ByteString("abcdef"))) wsClient.expectNoMessage(100.millis) wsClient.sendMessage("John") wsClient.expectMessage("Hello John!") wsClient.sendCompletion() wsClient.expectCompletion() }
- Java
-
source
import static org.apache.pekko.http.javadsl.server.Directives.path; import static org.apache.pekko.http.javadsl.server.Directives.handleWebSocketMessages; final Flow<Message, Message, NotUsed> greeter = Flow.of(Message.class) .mapConcat( msg -> { if (msg instanceof TextMessage) { final TextMessage tm = (TextMessage) msg; final TextMessage ret = TextMessage.create( Source.single("Hello ") .concat(tm.getStreamedText()) .concat(Source.single("!"))); return Collections.singletonList(ret); } else if (msg instanceof BinaryMessage) { final BinaryMessage bm = (BinaryMessage) msg; bm.getStreamedData().runWith(Sink.ignore(), materializer()); return Collections.emptyList(); } else { throw new IllegalArgumentException("Unsupported message type!"); } }); final Route websocketRoute = path("greeter", () -> handleWebSocketMessages(greeter)); // create a testing probe representing the client-side final WSProbe wsClient = WSProbe.create(system(), materializer()); // WS creates a WebSocket request for testing testRoute(websocketRoute) .run(WS(Uri.create("/greeter"), wsClient.flow(), materializer())) .assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS); // manually run a WS conversation wsClient.sendMessage("Peter"); wsClient.expectMessage("Hello Peter!"); wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef"))); wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); wsClient.sendMessage("John"); wsClient.expectMessage("Hello John!"); wsClient.sendCompletion(); wsClient.expectCompletion();
1.1.0+17-3b5f9b27*