Client-Side HTTP/2 (Preview)
Client-Side HTTP/2 support in pekko-http is currently available as a preview. This means it is ready to be evaluated, but the APIs and behavior are likely to change.
It is recommended to first read the Implications of the streaming nature of Request/Response Entities and Host-Level Client-Side API sections, as they explain the underlying full-stack streaming concepts, tuning the client settings and HTTPS context and how to handle the Request-Response Cycle, which may be unexpected when coming from a background with non-“streaming first” HTTP Clients.
Create the client¶
There are three mechanisms for a client to establish an HTTP/2 connection. Apache Pekko HTTP supports:
- HTTP/2 over TLS
- HTTP/2 over a plain TCP connection (“h2c with prior knowledge”)
Apache Pekko HTTP doesn’t support:
- HTTP
Upgrade
mechanism
HTTP/2 over TLS¶
To create a client, use the Http()
fluent API to connect and use the http2()
creator:
sourceimport org.apache.pekko.http.scaladsl.Http
Http().connectionTo("localhost").toPort(8443).http2()
sourceimport org.apache.pekko.http.javadsl.Http;
Http.get(system).connectionTo("127.0.0.1").toPort(8443).http2();
HTTP/2 over TLS needs Application-Layer Protocol Negotiation (ALPN) to negotiate whether both client and server support HTTP/2. The JVM provides ALPN support starting from JDK 8u252. Make sure to use at least that version.
Apache Pekko HTTP does not currently support protocol negotiation to fall back to HTTP/1.1 for this API. When the server does not support HTTP/2, the stream will fail.
h2c with prior knowledge¶
The other option is to connect and start communicating in HTTP/2 immediately. You must know beforehand the target server supports HTTP/2 over a plain TCP connection. For this reason this approach is known as h2c with Prior Knowledge of HTTP/2 support.
To create a client, use the Http()
fluent API to connect and use the http2WithPriorKnowledge()
creator:
sourceimport org.apache.pekko.http.scaladsl.Http
Http().connectionTo("localhost").toPort(8080).http2WithPriorKnowledge()
sourceimport org.apache.pekko.http.javadsl.Http;
Http.get(system).connectionTo("127.0.0.1").toPort(8080).http2WithPriorKnowledge();
HTTP Upgrade mechanism¶
The Apache Pekko HTTP client doesn’t support HTTP/1 to HTTP/2 negotiation over plaintext using the Upgrade
mechanism.
Request-response ordering¶
For HTTP/2 connections the responses are not guaranteed to arrive in the same order that the requests were emitted to the server, for example a request with a quickly available response may outrun a previous request that the server is slower to respond to. For HTTP/2 it is therefore important to have a way to correlate the response with the request it was made for. This can be achieved through a RequestResponseAssociation
set on the request, Apache Pekko HTTP will pass such association objects on to the response.
In this sample the built-in org.apache.pekko.http.scaladsl.model.ResponsePromise
RequestResponseAssociation
is used to return a Future
for the response:
sourceval dispatch = singleRequest(Http().connectionTo("pekko.apache.org").http2())
dispatch(
HttpRequest(
uri = "https://pekko.apache.org/api/pekko/current/org/apache/pekko/actor/typed/scaladsl/index.html",
headers = headers.`Accept-Encoding`(HttpEncodings.gzip) :: Nil)).onComplete { res =>
println(s"[1] Got index.html: $res")
res.get.entity.dataBytes.runWith(Sink.ignore).onComplete(res => println(s"Finished reading [1] $res"))
}
def singleRequest(
connection: Flow[HttpRequest, HttpResponse, Any], bufferSize: Int = 100): HttpRequest => Future[HttpResponse] = {
val queue =
Source.queue(bufferSize)
.via(connection)
.to(Sink.foreach { response =>
// complete the response promise with the response when it arrives
val responseAssociation = response.attribute(ResponsePromise.Key).get
responseAssociation.promise.trySuccess(response)
})
.run()
req => {
// create a promise of the response for each request and set it as an attribute on the request
val p = Promise[HttpResponse]()
queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p))) match {
// return the future response
case QueueOfferResult.Enqueued => p.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(
new RuntimeException("Queue was closed (pool shut down)."))
}
}
}
source Function<HttpRequest, CompletionStage<HttpResponse>> dispatch =
singleRequest(system, Http.get(system).connectionTo("pekko.apache.org").http2());
dispatch
.apply(
HttpRequest.create(
"https://pekko.apache.org/api/pekko/current/org/apache/pekko/actor/typed/scaladsl/index.html")
.withHeaders(Arrays.asList(AcceptEncoding.create(HttpEncodings.GZIP))))
.thenAccept(
res -> {
System.out.println("[1] Got index.html: " + res);
res.entity()
.getDataBytes()
.runWith(Sink.ignore(), mat)
.thenAccept(
consumedRes -> System.out.println("Finished reading [1] " + consumedRes));
});
private static Function<HttpRequest, CompletionStage<HttpResponse>> singleRequest(
ActorSystem system, Flow<HttpRequest, HttpResponse, ?> connection) {
BoundedSourceQueue<HttpRequest> queue =
Source.<HttpRequest>queue(100)
.via(connection)
.to(
Sink.foreach(
res -> {
try {
// complete the future with the response when it arrives
ResponseFuture responseFuture =
res.getAttribute(ResponseFuture.KEY()).get();
responseFuture.future().complete(res);
} catch (Exception ex) {
ex.printStackTrace();
}
}))
.run(SystemMaterializer.get(system).materializer());
return (HttpRequest req) -> {
// create a future of the response for each request and set it as an attribute on the request
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
ResponseFuture attribute = new ResponseFuture(future);
queue.offer(req.addAttribute(ResponseFuture.KEY(), attribute));
// return the future response
return attribute.future();
};
}