Request-Level Client-Side API

The request-level API is the recommended and most convenient way of using Apache Pekko HTTP’s client-side functionality. It internally builds upon the Host-Level Client-Side API to provide you with a simple and easy-to-use way of retrieving HTTP responses from remote servers. Depending on your preference you can pick the Future-based variant or Flow-based variant.

Note

It is recommended to first read the Implications of the streaming nature of Request/Response Entities section, as it explains the underlying full-stack streaming concepts, which may be unexpected when coming from a background with non-“streaming first” HTTP Clients.

Note

The request-level API is implemented on top of a connection pool that is shared inside the actor system. A consequence of using a pool is that long-running requests block a connection while running and starve other requests. Make sure not to use the request-level API for long-running requests like long-polling GET requests. Use the Connection-Level Client-Side API or an extra pool just for the long-running connection instead.

Future-Based Variant

Most often, your HTTP client needs are very basic. You need the HTTP response for a certain request and don’t want to bother with setting up a full-blown streaming infrastructure.

For these cases Apache Pekko HTTP offers the Http().singleRequest(...)Http.get(system).singleRequest(...) method, which turns an HttpRequestHttpRequest instance into Future[HttpResponse]CompletionStage<HttpResponse>. Internally the request is dispatched across the (cached) host connection pool for the request’s effective URI.

The request must have either an absolute URI or a valid Host header, otherwise the returned future will be completed with an error.

Example

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._

import scala.concurrent.Future
import scala.util.{ Failure, Success }

object HttpClientSingleRequest {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem(Behaviors.empty, "SingleRequest")
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.executionContext

    val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://pekko.apache.org"))

    responseFuture
      .onComplete {
        case Success(res) => println(res)
        case Failure(_)   => sys.error("something wrong")
      }
  }
}
Java
sourcefinal ActorSystem system = ActorSystem.create();

final CompletionStage<HttpResponse> responseFuture =
    Http.get(system).singleRequest(HttpRequest.create("http://pekko.apache.org"));

Using the Future-Based API in Actors

When using the FutureCompletionStage based API from inside a classic Apache Pekko ActorActor, all the usual caveats apply to how one should deal with the futures completion. For example, you should not access the actor’s state from within the FutureCompletionStage’s callbacks (such as map, onComplete, …) and, instead, you should use the pipeTopipe pattern to pipe the result back to the actor as a message:

Scala
sourceimport org.apache.pekko
import pekko.actor.{ Actor, ActorLogging, ActorSystem }
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._
import pekko.util.ByteString

class Myself extends Actor
    with ActorLogging {

  import pekko.pattern.pipe
  import context.dispatcher

  implicit val system: ActorSystem = context.system
  val http = Http(system)

  override def preStart() = {
    http.singleRequest(HttpRequest(uri = "http://pekko.apache.org"))
      .pipeTo(self)
  }

  def receive = {
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
        log.info("Got response, body: " + body.utf8String)
      }
    case resp @ HttpResponse(code, _, _, _) =>
      log.info("Request failed, response code: " + code)
      resp.discardEntityBytes()
  }

}
Java
sourceimport org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import static org.apache.pekko.pattern.PatternsCS.pipe;

class SingleRequestInActorExample extends AbstractActor {
  final Http http = Http.get(context().system());
  final ExecutionContextExecutor dispatcher = context().dispatcher();

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(String.class, url -> pipe(fetch(url), dispatcher).to(self()))
        .build();
  }

  CompletionStage<HttpResponse> fetch(String url) {
    return http.singleRequest(HttpRequest.create(url));
  }
}
Warning

Always make sure you consume the response entity streams (of type Source<ByteString,Unit>Source[ByteString,Unit]Source<ByteString, Object>Source[ByteString, Object]). Connect the response entity Source to a SinkSink, or call response.discardEntityBytes()response.discardEntityBytes(Materializer) if you don’t care about the response entity.

Read the Implications of the streaming nature of Request/Response Entities section for more details.

If the application doesn’t subscribe to the response entity within pekko.http.host-connection-pool.response-entity-subscription-timeout, the stream will fail with a TimeoutException: Response entity was not subscribed after ....

Flow-Based Variant

The Flow-based variant of the request-level client-side API is presented by the Http().superPool(...)Http.get(system).superPool(...) method. It creates a new “super connection pool flow”, which routes incoming requests to a (cached) host connection pool depending on their respective effective URIs.

The FlowFlow returned by Http().superPool(...)Http.get(system).superPool(...) is very similar to the one from the Host-Level Client-Side API, so the section on Using a Host Connection Pool also applies here.

However, there is one notable difference between a “host connection pool client flow” for the Host-Level API and a “super-pool flow” from the Request-Level API:

  • In a “host connection pool client flow” the flow has an implicit target host context. Therefore, the requests it takes don’t need to have absolute URIs or a valid Host header because the host connection pool will automatically add a Host header if required.

For a “super-pool flow” in the Request-Level API this is not the case. All requests to a super-pool must either have an absolute URI or a valid Host header, because otherwise it’d be impossible to find out which target endpoint to direct the request to.

Collecting headers from a server response

Sometimes we would like to get only headers of specific type which are sent from a server. In order to collect headers in a type safe way Apache Pekko HTTP API provides a type for each HTTP header. Here is an example for getting all cookies set by a server (Set-Cookie header):

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.headers.`Set-Cookie`
import pekko.http.scaladsl.model._

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future

class HttpClientCollectingHeaders {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem()
    implicit val executionContext: ExecutionContextExecutor = system.dispatcher

    val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://pekko.apache.org"))

    responseFuture.map {
      case response @ HttpResponse(StatusCodes.OK, _, _, _) =>
        val setCookies = response.headers[`Set-Cookie`]
        println(s"Cookies set by a server: $setCookies")
        response.discardEntityBytes()
      case _ => sys.error("something wrong")
    }
  }
}
Java
sourcefinal HttpResponse response = responseFromSomewhere();

final Iterable<SetCookie> setCookies = response.getHeaders(SetCookie.class);

System.out.println("Cookies set by a server: " + setCookies);
response.discardEntityBytes(system);