handleWebSocketMessages
Signature¶
def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route
Description¶
The directive first checks if the request was a valid WebSocket handshake request and if yes, it completes the request with the passed handler. Otherwise, the request is rejected with an ExpectedWebSocketRequestRejection
.
WebSocket subprotocols offered in the Sec-WebSocket-Protocol
header of the request are ignored. If you want to support several protocols use the handleWebSocketMessagesForProtocol directive, instead.
For more information about the WebSocket support, see Server-Side WebSocket Support.
Example¶
sourcedef greeter: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(Source.single("Hello ") ++ tm.textStream ++ Source.single("!")) :: Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}
val websocketRoute =
path("greeter") {
handleWebSocketMessages(greeter)
}
// tests:
// create a testing probe representing the client-side
val wsClient = WSProbe()
// WS creates a WebSocket request for testing
WS("/greeter", wsClient.flow) ~> websocketRoute ~>
check {
// check response for WS Upgrade headers
isWebSocketUpgrade shouldEqual true
// manually run a WS conversation
wsClient.sendMessage("Peter")
wsClient.expectMessage("Hello Peter!")
wsClient.sendMessage(BinaryMessage(ByteString("abcdef")))
wsClient.expectNoMessage(100.millis)
wsClient.sendMessage("John")
wsClient.expectMessage("Hello John!")
wsClient.sendCompletion()
wsClient.expectCompletion()
}
sourceimport static org.apache.pekko.http.javadsl.server.Directives.path;
import static org.apache.pekko.http.javadsl.server.Directives.handleWebSocketMessages;
final Flow<Message, Message, NotUsed> greeter =
Flow.of(Message.class)
.mapConcat(
msg -> {
if (msg instanceof TextMessage) {
final TextMessage tm = (TextMessage) msg;
final TextMessage ret =
TextMessage.create(
Source.single("Hello ")
.concat(tm.getStreamedText())
.concat(Source.single("!")));
return Collections.singletonList(ret);
} else if (msg instanceof BinaryMessage) {
final BinaryMessage bm = (BinaryMessage) msg;
bm.getStreamedData().runWith(Sink.ignore(), materializer());
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Unsupported message type!");
}
});
final Route websocketRoute = path("greeter", () -> handleWebSocketMessages(greeter));
// create a testing probe representing the client-side
final WSProbe wsClient = WSProbe.create(system(), materializer());
// WS creates a WebSocket request for testing
testRoute(websocketRoute)
.run(WS(Uri.create("/greeter"), wsClient.flow(), materializer()))
.assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
// manually run a WS conversation
wsClient.sendMessage("Peter");
wsClient.expectMessage("Hello Peter!");
wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
wsClient.sendMessage("John");
wsClient.expectMessage("Hello John!");
wsClient.sendCompletion();
wsClient.expectCompletion();
1.1.0