Implications of the streaming nature of Request/Response Entities

Apache Pekko HTTP is streaming all the way through, which means that the back-pressure mechanisms enabled by Apache Pekko Streams are exposed through all layers–from the TCP layer, through the HTTP server, all the way up to the user-facing HttpRequestHttpRequest and HttpResponseHttpResponse and their HttpEntityHttpEntity APIs.

This has surprising implications if you are used to non-streaming / not-reactive HTTP clients. Specifically it means that: “lack of consumption of the HTTP Entity, is signaled as back-pressure to the other side of the connection”. This is a feature, as it allows one only to consume the entity, and back-pressure servers/clients from overwhelming our application, possibly causing unnecessary buffering of the entity in memory.

Put another way: Streaming all the way through is a feature of Apache Pekko HTTP that allows consuming entities (and pulling them through the network) in a streaming fashion, and only on demand when the client is ready to consume the bytes. Therefore, you have to explicitly consume or discard the entity.

On a client, for example, if the application doesn’t subscribe to the response entity within pekko.http.host-connection-pool.response-entity-subscription-timeout, the stream will fail with a TimeoutException: Response entity was not subscribed after ....

Warning

Consuming (or discarding) the Entity of a request is mandatory! If accidentally left neither consumed or discarded Apache Pekko HTTP will assume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. A client should consume the Entity regardless of the status of the HttpResponseHttpResponse.

Client-Side handling of streaming HTTP Entities

Consuming the HTTP Response Entity (Client)

There are two use-cases to consume the entity of a response:

  1. process the bytes as the response arrives from the network buffer
  2. load all the bytes in memory first, and process them afterwards

The most common use-case, and recommended, of course, is consuming the response entity as a stream, which can be done via running the underlying dataBytes Source.

It is encouraged to use various streaming techniques to utilise the underlying infrastructure to its fullest, for example by framing the incoming chunks, parsing them line-by-line and then connecting the flow into another destination Sink, such as a File or other Apache Pekko Streams connector:

Scala
sourceimport java.io.File

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl.{ FileIO, Framing }
import pekko.util.ByteString

implicit val system: ActorSystem = ActorSystem()

val response: HttpResponse = ???

response.entity.dataBytes
  .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256))
  .map(transformEachLine)
  .runWith(FileIO.toPath(new File("/tmp/example.out").toPath))

def transformEachLine(line: ByteString): ByteString = ???
Java
sourceimport java.io.File;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pekko.stream.javadsl.Framing;
import org.apache.pekko.http.javadsl.model.*;
import scala.concurrent.duration.FiniteDuration;

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();

final HttpResponse response = responseFromSomewhere();

final Function<ByteString, ByteString> transformEachLine =
    line -> line /* some transformation here */;

final int maximumFrameLength = 256;

response
    .entity()
    .getDataBytes()
    .via(
        Framing.delimiter(
            ByteString.fromString("\n"), maximumFrameLength, FramingTruncation.ALLOW))
    .map(transformEachLine::apply)
    .runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), system);

However, sometimes the need may arise to consume the entire entity as Strict entity (which means that it is completely loaded into memory). Apache Pekko HTTP provides a special toStrict(timeout)toStrict(timeout, materializer) method which can be used to eagerly consume the entity and make it available in memory. Once in memory, data can be consumed as a ByteString or as a Source:

Scala
sourceimport scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.util.ByteString

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

case class ExamplePerson(name: String)
def parse(line: ByteString): ExamplePerson = ???

val response: HttpResponse = ???

// toStrict to enforce all data be loaded into memory from the connection
val strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3.seconds)

// You can now use the `data` directly...
val person1: Future[ExamplePerson] = strictEntity.map(e => parse(e.data))

// Though it is also still possible to use the streaming API to consume dataBytes,
// even though now they're in memory:
val person2: Future[ExamplePerson] =
  strictEntity.flatMap { e =>
    e.dataBytes
      .runFold(ByteString.empty) { case (acc, b) => acc ++ b }
      .map(parse)
  }
Java
sourcefinal class ExamplePerson {
  final String name;

  public ExamplePerson(String name) {
    this.name = name;
  }
}

public ExamplePerson parse(ByteString line) {
  return new ExamplePerson(line.utf8String());
}

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();

final HttpResponse response = responseFromSomewhere();

// toStrict to enforce all data be loaded into memory from the connection
final CompletionStage<HttpEntity.Strict> strictEntity =
    response.entity().toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), system);

// You can now use `getData` to get the data directly...
final CompletionStage<ExamplePerson> person1 =
    strictEntity.thenApply(strict -> parse(strict.getData()));

// Though it is also still possible to use the streaming API to consume dataBytes,
// even though now they're in memory:
final CompletionStage<ExamplePerson> person2 =
    strictEntity.thenCompose(
        strict ->
            strict
                .getDataBytes()
                .runFold(emptyByteString(), (acc, b) -> acc.concat(b), system)
                .thenApply(this::parse));

Integrating with Apache Pekko Streams

In some cases, it is necessary to process the results of a series of Apache Pekko HTTP calls as Apache Pekko Streams. In order to ensure that the HTTP Response Entity is consumed in a timely manner, the Apache Pekko HTTP stream for each request must be executed and completely consumed, then sent along for further processing.

Failing to account for this behavior can result in seemingly non-deterministic failures due to complex interactions between http and stream buffering. This manifests as errors such as the following:

Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`.

This error indicates that the http response has been available for too long without being consumed. It can be partially worked around by increasing the subscription timeout, but you will still run the risk of running into network level timeouts and could still exceed the timeout under load so it’s best to resolve the issue properly such as in the examples below:

Scala
sourceimport scala.concurrent.ExecutionContext
import scala.concurrent.Future

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._
import pekko.util.ByteString
import pekko.stream.scaladsl.{ Flow, Sink, Source }

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

case class ExamplePerson(name: String)

def parse(line: ByteString): Option[ExamplePerson] =
  line.utf8String.split(" ").headOption.map(ExamplePerson.apply)

val requests: Source[HttpRequest, NotUsed] = Source
  .fromIterator(() =>
    Range(0, 10).map(i => HttpRequest(uri = Uri(s"https://localhost/people/$i"))).iterator)

val processorFlow: Flow[Option[ExamplePerson], Int, NotUsed] =
  Flow[Option[ExamplePerson]].map(_.map(_.name.length).getOrElse(0))

// Run and completely consume a single pekko http request
def runRequest(req: HttpRequest): Future[Option[ExamplePerson]] =
  Http()
    .singleRequest(req)
    .flatMap { response =>
      response.entity.dataBytes
        .runReduce(_ ++ _)
        .map(parse)
    }

// Run each pekko http flow to completion, then continue processing. You'll want to tune the `parallelism`
// parameter to mapAsync -- higher values will create more cpu and memory load which may or may not positively
// impact performance.
requests
  .mapAsync(2)(runRequest)
  .via(processorFlow)
  .runWith(Sink.ignore)
Java
sourcefinal class ExamplePerson {
  final String name;

  public ExamplePerson(String name) {
    this.name = name;
  }
}

public ExamplePerson parse(ByteString line) {
  return new ExamplePerson(line.utf8String());
}

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();

// run a single request, consuming it completely in a single stream
public CompletionStage<ExamplePerson> runRequest(HttpRequest request) {
  return Http.get(system)
      .singleRequest(request)
      .thenCompose(
          response ->
              response
                  .entity()
                  .getDataBytes()
                  .runReduce((a, b) -> a.concat(b), system)
                  .thenApply(this::parse));
}

final List<HttpRequest> requests = new ArrayList<>();

final Flow<ExamplePerson, Integer, NotUsed> exampleProcessingFlow =
    Flow.fromFunction(person -> person.toString().length());

final CompletionStage<Done> stream =
    Source.from(requests)
        .mapAsync(1, this::runRequest)
        .via(exampleProcessingFlow)
        .runWith(Sink.ignore(), system);

Discarding the HTTP Response Entity (Client)

Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code), yet as explained above entity still has to be consumed in some way, otherwise we’ll be exerting back-pressure on the underlying TCP connection.

The discardEntityBytes convenience method serves the purpose of easily discarding the entity if it has no purpose for us. It does so by piping the incoming bytes directly into an Sink.ignore.

The two snippets below are equivalent, and work the same way on the server-side for incoming HTTP Requests:

Scala
sourceimport scala.concurrent.ExecutionContext

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model.HttpMessage.DiscardedEntity
import pekko.http.scaladsl.model._

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below)

val discarded: DiscardedEntity = response1.discardEntityBytes()
discarded.future.onComplete { done => println("Entity discarded completely!") }
Java
sourcefinal ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();

final HttpResponse response = responseFromSomewhere();

final HttpMessage.DiscardedEntity discarded = response.discardEntityBytes(system);

discarded
    .completionStage()
    .whenComplete(
        (done, ex) -> {
          System.out.println("Entity discarded completely!");
        });

Or the equivalent low-level code achieving the same result:

Scala
sourceval response1: HttpResponse = ??? // obtained from an HTTP call (see examples below)

val discardingComplete: Future[Done] = response1.entity.dataBytes.runWith(Sink.ignore)
discardingComplete.onComplete(done => println("Entity discarded completely!"))
Java
sourcefinal ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();

final HttpResponse response = responseFromSomewhere();

final CompletionStage<Done> discardingComplete =
    response.entity().getDataBytes().runWith(Sink.ignore(), system);

discardingComplete.whenComplete(
    (done, ex) -> {
      System.out.println("Entity discarded completely!");
    });

Server-Side handling of streaming HTTP Entities

HTTP Entities of a request are directly linked to Streams fed by the underlying TCP connection. Thus, if request entities remain not consumed, the server will back-pressure the connection, expecting the user-code to eventually decide what to do with the incoming data.

The most common use-case is to consume the request entity using directives such as BasicDirectives.extractDataBytes. Some directives force an implicit toStrict operation, such as entity(as[String])entity(exampleUnmarshaller, example -> {}).

Consuming the HTTP Request Entity (Server)

The simplest way of consuming the incoming request entity is to transform it into an actual domain object, for example by using the entity directive:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.server.Directives._
import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

implicit val system: ActorSystem = ActorSystem()
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContext = system.dispatcher

final case class Bid(userId: String, bid: Int)

// these are from spray-json
implicit val bidFormat: RootJsonFormat[Bid] = jsonFormat2(Bid.apply)

val route =
  path("bid") {
    put {
      entity(as[Bid]) { (bid: Bid) =>
        // incoming entity is fully consumed and converted into a Bid
        complete("The bid was: " + bid)
      }
    }
  }
Java
sourceclass Bid {
  final String userId;
  final int bid;

  Bid(String userId, int bid) {
    this.userId = userId;
    this.bid = bid;
  }
}

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Unmarshaller<HttpEntity, Bid> asBid = Jackson.unmarshaller(Bid.class);

final Route s =
    path(
        "bid",
        () ->
            put(
                () ->
                    entity(
                        asBid,
                        bid ->
                            // incoming entity is fully consumed and converted into a Bid
                            complete("The bid was: " + bid))));

You can also access the raw dataBytes and run the underlying stream. For example, you could pipe the raw dataBytes into a FileIO Sink. The FileIO Sink signals completion via a Future[IoResult]CompletionStage<IoResult> once all the data has been written into the file:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl.FileIO
import pekko.http.scaladsl.server.Directives._
import java.io.File

implicit val system = ActorSystem()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val route =
  (put & path("lines")) {
    withoutSizeLimit {
      extractDataBytes { bytes =>
        val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))

        // we only want to respond once the incoming data has been handled:
        onComplete(finishedWriting) { ioResult =>
          complete("Finished writing data: " + ioResult)
        }
      }
    }
  }
Java
sourcefinal ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Route s =
    put(
        () ->
            path(
                "lines",
                () ->
                    withoutSizeLimit(
                        () ->
                            extractDataBytes(
                                bytes -> {
                                  final CompletionStage<IOResult> res =
                                      bytes.runWith(
                                          FileIO.toPath(new File("/tmp/example.out").toPath()),
                                          materializer);

                                  return onComplete(
                                      () -> res,
                                      ioResult ->
                                          // we only want to respond once the incoming data has
                                          // been handled:
                                          complete("Finished writing data :" + ioResult));
                                }))));

Discarding the HTTP Request Entity (Server)

You may want to discard the uploaded entity. For example, depending on some validation (e.g. “is user authorized to upload files?”).

Please note that “discarding the HTTP Request Entity” means that the entire upload will proceed, even though you are not interested in the data being streamed to the server. This is useful if you are simply not interested in the entity.

In order to discard the dataBytes explicitly you can invoke the discardEntityBytes bytes of the incoming HttpRequest:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.server.Directives._
import pekko.http.scaladsl.model.HttpRequest

implicit val system = ActorSystem()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val route =
  (put & path("lines")) {
    withoutSizeLimit {
      extractRequest { (r: HttpRequest) =>
        val finishedWriting = r.discardEntityBytes().future

        // we only want to respond once the incoming data has been handled:
        onComplete(finishedWriting) { done =>
          complete("Drained all data from connection... (" + done + ")")
        }
      }
    }
  }
Java
sourcefinal ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();

final Route s =
    put(
        () ->
            path(
                "lines",
                () ->
                    withoutSizeLimit(
                        () ->
                            extractRequest(
                                r -> {
                                  final CompletionStage<Done> res =
                                      r.discardEntityBytes(system).completionStage();

                                  return onComplete(
                                      () -> res,
                                      done ->
                                          // we only want to respond once the incoming data has
                                          // been handled:
                                          complete("Finished writing data :" + done));
                                }))));

A related concept is cancelling the incoming entity.dataBytesentity.getDataBytes() stream. Cancellation results in Apache Pekko HTTP abruptly closing the connection from the Client. This may be useful when you detect that the given user should not be allowed to make any uploads at all, and you want to drop the connection (instead of reading and ignoring the incoming data). This can be done by attaching the incoming entity.dataBytesentity.getDataBytes() to a Sink.cancelled() which will cancel the entity stream, which in turn will cause the underlying connection to be shut-down by the server – effectively hard-aborting the incoming request:

Scala
sourceimport org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl.Sink
import pekko.http.scaladsl.server.Directives._
import pekko.http.scaladsl.model.headers.Connection

implicit val system = ActorSystem()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val route =
  (put & path("lines")) {
    withoutSizeLimit {
      extractDataBytes { data =>
        // Closing connections, method 1 (eager):
        // we deem this request as illegal, and close the connection right away:
        data.runWith(Sink.cancelled) // "brutally" closes the connection

        // Closing connections, method 2 (graceful):
        // consider draining connection and replying with `Connection: Close` header
        // if you want the client to close after this request/reply cycle instead:
        respondWithHeader(Connection("close"))
        complete(StatusCodes.Forbidden -> "Not allowed!")
      }
    }
  }
Java
sourcefinal ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Route s =
    put(
        () ->
            path(
                "lines",
                () ->
                    withoutSizeLimit(
                        () ->
                            extractDataBytes(
                                bytes -> {
                                  // Closing connections, method 1 (eager):
                                  // we deem this request as illegal, and close the connection
                                  // right away:
                                  bytes.runWith(
                                      Sink.cancelled(),
                                      materializer); // "brutally" closes the connection

                                  // Closing connections, method 2 (graceful):
                                  // consider draining connection and replying with `Connection:
                                  // Close` header
                                  // if you want the client to close after this request/reply
                                  // cycle instead:
                                  return respondWithHeader(
                                      Connection.create("close"),
                                      () -> complete(StatusCodes.FORBIDDEN, "Not allowed!"));
                                }))));

See also the Closing a connection section for an in-depth explanation on closing connection.

Pending: Automatic discarding of not used entities

Under certain conditions it is possible to detect an entity is very unlikely to be used by the user for a given request, and issue warnings or discard the entity automatically. This advanced feature has not been implemented yet, see the below note and issues for further discussion and ideas.

Note

An advanced feature code named “auto draining” has been discussed and proposed for Apache Pekko HTTP, and we’re hoping to implement or help the community implement it.

You can read more about it in issue #183 as well as issue #117 ; as always, contributions are very welcome!