Client-Side WebSocket Support
Client-side WebSocket support is available through Http().singleWebSocketRequest
Http.get(system).singleWebSocketRequest
, Http().webSocketClientFlow
Http.get(system).webSocketClientFlow
and Http().webSocketClientLayer
Http.get(system).webSocketClientLayer
.
A WebSocket consists of two streams of messages, incoming messages (a Sink
Sink
) and outgoing messages (a Source
Source
) where either may be signalled first; or even be the only direction in which messages flow during the lifetime of the connection. Therefore a WebSocket connection is modelled as either something you connect a Flow<Message, Message, Mat>
Flow[Message, Message, Mat]
to or a Flow<Message, Message, Mat>
Flow[Message, Message, Mat]
that you connect a Source<Message, Mat>
Source[Message, Mat]
and a Sink<Message, Mat>
Sink[Message, Mat]
to.
A WebSocket request starts with a regular HTTP request which contains an Upgrade
header (and possibly other regular HTTP request properties), so in addition to the flow of messages there also is an initial response from the server, this is modelled with WebSocketUpgradeResponse
WebSocketUpgradeResponse
.
The methods of the WebSocket client API handle the upgrade to WebSocket on connection success and materializes the connected WebSocket stream. If the connection fails, for example with a 404 NotFound
error, this regular HTTP result can be found in WebSocketUpgradeResponse.response
Make sure to read and understand the section about Half-Closed WebSockets as the behavior when using WebSockets for one-way communication may not be what you would expect.
Message
Messages sent and received over a WebSocket can be either TextMessage
TextMessage
s or BinaryMessage
BinaryMessage
s and each of those has two subtypes Strict
(all data in one chunk) or Streamed
. In typical applications messages will be Strict
as WebSockets are usually deployed to communicate using small messages not stream data, the protocol does however allow this (by not marking the first fragment as final, as described in RFC 6455 section 5.2).
The strict text is available from TextMessage.Strict
TextMessage.getStrictText
and strict binary data from BinaryMessage.Strict
BinaryMessage.getStrictData
.
For streamed messages BinaryMessage.Streamed
BinaryMessage.getStreamedData
and TextMessage.Streamed
TextMessage.getStreamedText
will be used. In these cases the data is provided as a Source<ByteString, ?>
Source[ByteString, _]
for binary and Source<String, ?>
Source[String, _]
for text messages.
singleWebSocketRequest
singleWebSocketRequest
takes a WebSocketRequest
WebSocketRequest
and a flow it will connect to the source and sink of the WebSocket connection. It will trigger the request right away and returns a tuple containing the Future[WebSocketUpgradeResponse]
CompletionStage<WebSocketUpgradeResponse>
and the materialized value from the flow passed to the method.
The future will succeed when the WebSocket connection has been established or the server returned a regular HTTP response, or fail if the connection fails with an exception.
Simple example sending a message and printing any incoming message:
- Scala
-
source
/* * Licensed to the Apache Software Foundation (ASF) under one or more * license agreements; and to You under the Apache License, version 2.0: * * https://www.apache.org/licenses/LICENSE-2.0 * * This file is part of the Apache Pekko project, which was derived from Akka. */ /* * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com> */ package docs.http.scaladsl import org.apache.pekko import pekko.actor.ActorSystem import pekko.{ Done, NotUsed } import pekko.http.scaladsl.Http import pekko.stream.scaladsl._ import pekko.http.scaladsl.model._ import pekko.http.scaladsl.model.ws._ import scala.concurrent.Future object SingleWebSocketRequest { def main(args: Array[String]): Unit = { implicit val system = ActorSystem() import system.dispatcher // print each incoming strict text message val printSink: Sink[Message, Future[Done]] = Sink.foreach { case message: TextMessage.Strict => println(message.text) case _ => // ignore other message types } val helloSource: Source[Message, NotUsed] = Source.single(TextMessage("hello world!")) // the Future[Done] is the materialized value of Sink.foreach // and it is completed when the stream completes val flow: Flow[Message, Message, Future[Done]] = Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left) // upgradeResponse is a Future[WebSocketUpgradeResponse] that // completes or fails when the connection succeeds or fails // and closed is a Future[Done] representing the stream completion from above val (upgradeResponse, closed) = Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow) val connected = upgradeResponse.map { upgrade => // just like a regular http request we can access response status which is available via upgrade.response.status // status code 101 (Switching Protocols) indicates that server support WebSockets if (upgrade.response.status == StatusCodes.SwitchingProtocols) { Done } else { throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") } } // in a real application you would not side effect here // and handle errors more carefully connected.onComplete(println) closed.foreach(_ => println("closed")) } }
- Java
-
source
ActorSystem system = ActorSystem.create(); Materializer materializer = Materializer.createMaterializer(system); Http http = Http.get(system); // print each incoming text message // would throw exception on non strict or binary message final Sink<Message, CompletionStage<Done>> printSink = Sink.foreach( (message) -> System.out.println("Got message: " + message.asTextMessage().getStrictText())); // send this as a message over the WebSocket final Source<Message, NotUsed> helloSource = Source.single(TextMessage.create("hello world")); // the CompletionStage<Done> is the materialized value of Sink.foreach // and it is completed when the stream completes final Flow<Message, Message, CompletionStage<Done>> flow = Flow.fromSinkAndSourceMat(printSink, helloSource, Keep.left()); final Pair<CompletionStage<WebSocketUpgradeResponse>, CompletionStage<Done>> pair = http.singleWebSocketRequest( WebSocketRequest.create("ws://echo.websocket.org"), flow, materializer); // The first value in the pair is a CompletionStage<WebSocketUpgradeResponse> that // completes when the WebSocket request has connected successfully (or failed) final CompletionStage<Done> connected = pair.first() .thenApply( upgrade -> { // just like a regular http request we can access response status which is // available via upgrade.response.status // status code 101 (Switching Protocols) indicates that server support WebSockets if (upgrade.response().status().equals(StatusCodes.SWITCHING_PROTOCOLS)) { return Done.getInstance(); } else { throw new RuntimeException("Connection failed: " + upgrade.response().status()); } }); // the second value is the completion of the sink from above // in other words, it completes when the WebSocket disconnects final CompletionStage<Done> closed = pair.second(); // in a real application you would not side effect here // and handle errors more carefully connected.thenAccept(done -> System.out.println("Connected")); closed.thenAccept(done -> System.out.println("Connection closed"));
The websocket request may also include additional headers, like in this example, HTTP Basic Auth:
- Scala
-
source
val (upgradeResponse, _) = Http().singleWebSocketRequest( WebSocketRequest( "ws://example.com:8080/some/path", extraHeaders = Seq(Authorization( BasicHttpCredentials("johan", "correcthorsebatterystaple")))), flow)
- Java
-
source
http.singleWebSocketRequest( WebSocketRequest.create("ws://example.com:8080/some/path") .addHeader(Authorization.basic("johan", "correcthorsebatterystaple")), flow, materializer);
webSocketClientFlow
webSocketClientFlow
takes a request, and returns a Flow<Message, Message, Future<WebSocketUpgradeResponse>>
Flow[Message, Message, Future[WebSocketUpgradeResponse]]
Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>>
Flow[Message, Message, CompletionStage[WebSocketUpgradeResponse]]
.
The future that is materialized from the flow will succeed when the WebSocket connection has been established or the server returned a regular HTTP response, or fail if the connection fails with an exception.
The Flow
Flow
that is returned by this method can only be materialized once. For each request a new flow must be acquired by calling the method again.
Simple example sending a message and printing any incoming message:
- Scala
-
source
/* * Licensed to the Apache Software Foundation (ASF) under one or more * license agreements; and to You under the Apache License, version 2.0: * * https://www.apache.org/licenses/LICENSE-2.0 * * This file is part of the Apache Pekko project, which was derived from Akka. */ /* * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com> */ package docs.http.scaladsl import org.apache.pekko import pekko.actor.ActorSystem import pekko.Done import pekko.http.scaladsl.Http import pekko.stream.scaladsl._ import pekko.http.scaladsl.model._ import pekko.http.scaladsl.model.ws._ import scala.concurrent.Future object WebSocketClientFlow { def main(args: Array[String]): Unit = { implicit val system = ActorSystem() import system.dispatcher // Future[Done] is the materialized value of Sink.foreach, // emitted when the stream completes val incoming: Sink[Message, Future[Done]] = Sink.foreach[Message] { case message: TextMessage.Strict => println(message.text) case _ => // ignore other message types } // send this as a message over the WebSocket val outgoing = Source.single(TextMessage("hello world!")) // flow to use (note: not re-usable!) val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) // the materialized value is a tuple with // upgradeResponse is a Future[WebSocketUpgradeResponse] that // completes or fails when the connection succeeds or fails // and closed is a Future[Done] with the stream completion from the incoming sink val (upgradeResponse, closed) = outgoing .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] .toMat(incoming)(Keep.both) // also keep the Future[Done] .run() // just like a regular http request we can access response status which is available via upgrade.response.status // status code 101 (Switching Protocols) indicates that server support WebSockets val connected = upgradeResponse.flatMap { upgrade => if (upgrade.response.status == StatusCodes.SwitchingProtocols) { Future.successful(Done) } else { throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") } } // in a real application you would not side effect here connected.onComplete(println) closed.foreach(_ => println("closed")) } }
- Java
-
source
ActorSystem system = ActorSystem.create(); Materializer materializer = Materializer.createMaterializer(system); Http http = Http.get(system); // print each incoming text message // would throw exception on non strict or binary message Sink<Message, CompletionStage<Done>> printSink = Sink.foreach( (message) -> System.out.println("Got message: " + message.asTextMessage().getStrictText())); // send this as a message over the WebSocket Source<Message, NotUsed> helloSource = Source.single(TextMessage.create("hello world")); Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> webSocketFlow = http.webSocketClientFlow(WebSocketRequest.create("ws://echo.websocket.org")); Pair<CompletionStage<WebSocketUpgradeResponse>, CompletionStage<Done>> pair = helloSource .viaMat(webSocketFlow, Keep.right()) .toMat(printSink, Keep.both()) .run(materializer); // The first value in the pair is a CompletionStage<WebSocketUpgradeResponse> that // completes when the WebSocket request has connected successfully (or failed) CompletionStage<WebSocketUpgradeResponse> upgradeCompletion = pair.first(); // the second value is the completion of the sink from above // in other words, it completes when the WebSocket disconnects CompletionStage<Done> closed = pair.second(); CompletionStage<Done> connected = upgradeCompletion.thenApply( upgrade -> { // just like a regular http request we can access response status which is available // via upgrade.response.status // status code 101 (Switching Protocols) indicates that server support WebSockets if (upgrade.response().status().equals(StatusCodes.SWITCHING_PROTOCOLS)) { return Done.getInstance(); } else { throw new RuntimeException(("Connection failed: " + upgrade.response().status())); } }); // in a real application you would not side effect here // and handle errors more carefully connected.thenAccept(done -> System.out.println("Connected")); closed.thenAccept(done -> System.out.println("Connection closed"));
webSocketClientLayer
Just like the Stand-Alone HTTP Layer Usage for regular HTTP requests, the WebSocket layer can be used fully detached from the underlying TCP interface. The same scenarios as described for regular HTTP requests apply here.
The returned layer forms a BidiFlow<Message, SslTlsOutbound, SslTlsInbound, Message, Future<WebSocketUpgradeResponse>>
BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebSocketUpgradeResponse]]
BidiFlow<Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage<WebSocketUpgradeResponse>>
BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]]
.
Half-Closed WebSockets
The Apache Pekko HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed). This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server, like this:
- Scala
-
source
// we may expect to be able to to just tail // the server websocket output like this val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource( Sink.foreach(println), Source.empty) Http().singleWebSocketRequest( WebSocketRequest("ws://example.com:8080/some/path"), flow) - Java
-
source
// we may expect to be able to to just tail // the server websocket output like this final Flow<Message, Message, NotUsed> flow = Flow.fromSinkAndSource(Sink.foreach(System.out::println), Source.empty()); http.singleWebSocketRequest( WebSocketRequest.create("ws://example.com:8080/some/path"), flow, materializer);
This will in fact quickly close the connection because of the Source.empty
Source.empty()
being completed immediately when the stream is materialized. To solve this you can make sure to not complete the outgoing source by using for example Source.maybe
Source.maybe()
like this:
- Scala
-
source
// using Source.maybe materializes into a promise // which will allow us to complete the source later val flow: Flow[Message, Message, Promise[Option[Message]]] = Flow.fromSinkAndSourceMat( Sink.foreach[Message](println), Source.maybe[Message])(Keep.right) val (upgradeResponse, promise) = Http().singleWebSocketRequest( WebSocketRequest("ws://example.com:8080/some/path"), flow) // at some later time we want to disconnect promise.success(None) - Java
-
source
// using Source.maybe materializes into a completable future // which will allow us to complete the source later final Flow<Message, Message, CompletableFuture<Optional<Message>>> flow = Flow.fromSinkAndSourceMat(Sink.foreach(System.out::println), Source.maybe(), Keep.right()); final Pair<CompletionStage<WebSocketUpgradeResponse>, CompletableFuture<Optional<Message>>> pair = http.singleWebSocketRequest( WebSocketRequest.create("ws://example.com:8080/some/path"), flow, materializer); // at some later time we want to disconnect pair.second().complete(Optional.empty());
This will keep the outgoing source from completing, but without emitting any elements until the Promise
CompletableFuture
is manually completed which makes the Source
Source
complete and the connection to close.
The same problem holds true if emitting a finite number of elements, as soon as the last element is reached the Source
Source
will close and cause the connection to close. To avoid that you can concatenate Source.maybe
Source.maybe()
to the finite stream:
- Scala
-
source
// using emit "one" and "two" and then keep the connection open val flow: Flow[Message, Message, Promise[Option[Message]]] = Flow.fromSinkAndSourceMat( Sink.foreach[Message](println), Source(List(TextMessage("one"), TextMessage("two"))) .concatMat(Source.maybe[Message])(Keep.right))(Keep.right) val (upgradeResponse, promise) = Http().singleWebSocketRequest( WebSocketRequest("ws://example.com:8080/some/path"), flow) // at some later time we want to disconnect promise.success(None) - Java
-
source
// emit "one" and then "two" and then keep the source from completing final Source<Message, CompletableFuture<Optional<Message>>> source = Source.from(Arrays.<Message>asList(TextMessage.create("one"), TextMessage.create("two"))) .concatMat(Source.maybe(), Keep.right()); final Flow<Message, Message, CompletableFuture<Optional<Message>>> flow = Flow.fromSinkAndSourceMat(Sink.foreach(System.out::println), source, Keep.right()); final Pair<CompletionStage<WebSocketUpgradeResponse>, CompletableFuture<Optional<Message>>> pair = http.singleWebSocketRequest( WebSocketRequest.create("ws://example.com:8080/some/path"), flow, materializer); // at some later time we want to disconnect pair.second().complete(Optional.empty());
Scenarios that exist with the two streams in a WebSocket and possible ways to deal with it:
Scenario | Possible solution |
---|---|
Two-way communication | Flow.fromSinkAndSource , or Flow.map for a request-response protocol |
Infinite incoming stream, no outgoing | Flow.fromSinkAndSource(someSink, Source.maybe) Flow.fromSinkAndSource(someSink, Source.maybe()) |
Infinite outgoing stream, no incoming | Flow.fromSinkAndSource(Sink.ignore, yourSource) Flow.fromSinkAndSource(Sink.ignore(), yourSource) |
Automatic keep-alive Ping support
Similar to the server-side kee-alive Ping support, it is possible to configure the client-side to perform automatic keep-alive using Ping (or Pong) frames.
This is supported in a transparent way via configuration by setting the: pekko.http.client.websocket.periodic-keep-alive-max-idle = 1 second
to a specified max idle timeout. The keep-alive triggers when no other messages are in-flight during the such configured period. Apache Pekko HTTP will then automatically send a Ping
frame for each of such idle intervals.
By default, the automatic keep-alive feature is disabled.
Custom keep-alive data payloads
By default, pings do not carry any payload, as it is often enough to simply push any frame over the connection to ensure the connection stays healthy (or detect if it was severed), however you may configure them to carry a custom payload, to do this you can provide a function that will be asked to emit the payload for each of the ping messages generated:
- Scala
-
source
val defaultSettings = ClientConnectionSettings(system) val pingCounter = new AtomicInteger() val customWebsocketSettings = defaultSettings.websocketSettings .withPeriodicKeepAliveData(() => ByteString(s"debug-${pingCounter.incrementAndGet()}")) val customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings) val request = WebSocketRequest("ws://127.0.0.1") Http().singleWebSocketRequest( request, Flow[Message], Http().defaultClientHttpsContext, None, customSettings, system.log)
- Java
-
source
ClientConnectionSettings defaultSettings = ClientConnectionSettings.create(system); AtomicInteger pingCounter = new AtomicInteger(); WebSocketSettings customWebsocketSettings = defaultSettings .getWebsocketSettings() .withPeriodicKeepAliveData( () -> ByteString.fromString( String.format("debug-%d", pingCounter.incrementAndGet()))); ClientConnectionSettings customSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings); Http http = Http.get(system); http.singleWebSocketRequest( WebSocketRequest.create("ws://127.0.0.1"), clientFlow, ConnectionContext.noEncryption(), Optional.empty(), customSettings, system.log(), materializer);
Uni-directional Pong keep-alive
A Ping response will always be replied to by the client-side with an appropriate Pong
reply, carrying the same payload. It is also possible to configure the keep-alive mechanism to send Pong
frames instead of Ping
frames, which enables an uni-directional heartbeat mechanism (in which case the client side will not reply to such heartbeat). You can configure this mode by setting: pekko.http.client.websocket.periodic-keep-alive-mode = pong
.