1. Introduction

Project Info: Pekko Http
Artifact
org.apache.pekko
pekko-http
1.0.1
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.http
License
API documentation
Forums
Release notesReleases Notes
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-http

The Apache Pekko HTTP modules implement a full server- and client-side HTTP stack on top of pekko-actor and pekko-stream. It’s not a web-framework but rather a more general toolkit for providing and consuming HTTP-based services. While interaction with a browser is of course also in scope it is not the primary focus of Apache Pekko HTTP.

Apache Pekko HTTP follows a rather open design and many times offers several different API levels for “doing the same thing”. You get to pick the API level of abstraction that is most suitable for your application. This means that, if you have trouble achieving something using a high-level API, there’s a good chance that you can get it done with a low-level API, which offers more flexibility but might require you to write more application code.

Philosophy

Apache Pekko HTTP has been driven with a clear focus on providing tools for building integration layers rather than application cores. As such it regards itself as a suite of libraries rather than a framework.

A framework, as we’d like to think of the term, gives you a “frame”, in which you build your application. It comes with a lot of decisions already pre-made and provides a foundation including support structures that lets you get started and deliver results quickly. In a way a framework is like a skeleton onto which you put the “flesh” of your application in order to have it come alive. As such frameworks work best if you choose them before you start application development and try to stick to the framework’s “way of doing things” as you go along.

For example, if you are building a browser-facing web application it makes sense to choose a web framework and build your application on top of it because the “core” of the application is the interaction of a browser with your code on the web-server. The framework makers have chosen one “proven” way of designing such applications and let you “fill in the blanks” of a more or less flexible “application-template”. Being able to rely on best-practice architecture like this can be a great asset for getting things done quickly.

However, if your application is not primarily a web application because its core is not browser-interaction but some specialized maybe complex business service and you are merely trying to connect it to the world via a REST/HTTP interface a web-framework might not be what you need. In this case the application architecture should be dictated by what makes sense for the core not the interface layer. Also, you probably won’t benefit from the possibly existing browser-specific framework components like view templating, asset management, JavaScript- and CSS generation/manipulation/minification, localization support, AJAX support, etc.

Apache Pekko HTTP was designed specifically as “not-a-framework”, not because we don’t like frameworks, but for use cases where a framework is not the right choice. Apache Pekko HTTP is made for building integration layers based on HTTP and as such tries to “stay on the sidelines”. Therefore you normally don’t build your application “on top of” Apache Pekko HTTP, but you build your application on top of whatever makes sense and use Apache Pekko HTTP merely for the HTTP integration needs.

On the other hand, if you prefer to build your applications with the guidance of a framework, you should give Play Framework a try, which is planning to use Apache Pekko internally. If you come from Play and want to try Apache Pekko HTTP, we collected a side-by-side comparison to show how some Play routing features map to the Apache Pekko HTTP routing DSL.

Using Apache Pekko HTTP

Apache Pekko HTTP is provided as independent modules from Apache Pekko itself under its own release cycle. The modules, however, do not depend on pekko-actor or pekko-stream, so the user is required to choose an Apache Pekko version to run against and add a manual dependency to pekko-stream of the chosen version.

sbt
val PekkoVersion = "1.0.3"
val PekkoHttpVersion = "1.0.1"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
  "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion
)
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-http-bom_${versions.ScalaBinary}:1.0.1")

  implementation "org.apache.pekko:pekko-actor-typed_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
  implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}"
}
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-http-bom_${scala.binary.version}</artifactId>
      <version>1.0.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>

You can bootstrap a new project with Apache Pekko HTTP already configured using the Giter8 template directly via sbt:

For Scala (sbt)
sbt new apache/pekko-http-quickstart-scala.g8
For Java (Maven or Gradle)
sbt new apache/pekko-http-quickstart-java.g8
From there on the prepared project can be built using Gradle or Maven.

More instructions can be found on the template projecttemplate project.

Routing DSL for HTTP servers

The high-level, routing API of Apache Pekko HTTP provides a DSL to describe HTTP “routes” and how they should be handled. Each route is composed of one or more level of DirectivesDirectives that narrows down to handling one specific type of request.

For example one route might start with matching the path of the request, only matching if it is “/hello”, then narrowing it down to only handle HTTP get requests and then complete those with a string literal, which will be sent back as an HTTP OK with the string as response body.

The Route Route created using the Route DSL is then “bound” to a port to start serving HTTP requests:

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.server.Directives._
import scala.io.StdIn

object HttpServerRoutingMinimal {

  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem(Behaviors.empty, "my-system")
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.executionContext

    val route =
      path("hello") {
        get {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to Pekko HTTP</h1>"))
        }
      }

    val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)

    println(s"Server now online. Please navigate to http://localhost:8080/hello\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}
Java
sourceimport org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;

import java.util.concurrent.CompletionStage;

public class HttpServerMinimalExampleTest extends AllDirectives {

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

    final Http http = Http.get(system);

    // In order to access all directives we need an instance where the routes are define.
    HttpServerMinimalExampleTest app = new HttpServerMinimalExampleTest();

    final CompletionStage<ServerBinding> binding =
        http.newServerAt("localhost", 8080).bind(app.createRoute());

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
        .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
        .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }

  private Route createRoute() {
    return concat(path("hello", () -> get(() -> complete("<h1>Say hello to pekko-http</h1>"))));
  }
}

When you run this server, you can either open the page in a browser, at the following url: http://localhost:8080/hello, or call it in your terminal, via curl http://localhost:8080/hello.

Marshalling

Transforming request and response bodies between over-the-wire formats and objects to be used in your application is done separately from the route declarations, in marshallers, which are pulled in implicitly using the “magnet” pattern. This means that you can complete a request with any kind of object as long as there is an implicit marshaller available in scope.

Default marshallers are provided for simple objects like String or ByteString, and you can define your own for example for JSON. An additional module provides JSON serialization using the spray-json library (see JSON Support for details):

sbt
val PekkoHttpVersion = "1.0.1"
libraryDependencies += "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-http-bom_${versions.ScalaBinary}:1.0.1")

  implementation "org.apache.pekko:pekko-http-spray-json_${versions.ScalaBinary}"
}
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-http-bom_${scala.binary.version}</artifactId>
      <version>1.0.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http-spray-json_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>

JSON support is possible in pekko-http by the use of Jackson, an external artifact (see JSON Support for details):

sbt
val PekkoHttpVersion = "1.0.1"
libraryDependencies += "org.apache.pekko" %% "pekko-http-jackson" % PekkoHttpVersion
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-http-bom_${versions.ScalaBinary}:1.0.1")

  implementation "org.apache.pekko:pekko-http-jackson_${versions.ScalaBinary}"
}
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-http-bom_${scala.binary.version}</artifactId>
      <version>1.0.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-http-jackson_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>

A common use case is to reply to a request using a model object having the marshaller transform it into JSON. In this case shown by two separate routes. The first route queries an asynchronous database and marshals the Future[Option[Item]]CompletionStage<Optional<Item>> result into a JSON response. The second unmarshals an Order from the incoming request, saves it to the database and replies with an OK when done.

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.http.scaladsl.Http
import pekko.Done
import pekko.http.scaladsl.server.Route
import pekko.http.scaladsl.server.Directives._
import pekko.http.scaladsl.model.StatusCodes
// for JSON serialization/deserialization following dependency is required:
// "org.apache.pekko" %% "pekko-http-spray-json" % "<latest version>"
import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

import scala.io.StdIn

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

object SprayJsonExample {

  // needed to run the route
  implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "SprayExample")
  // needed for the future map/flatmap in the end and future in fetchItem and saveOrder
  implicit val executionContext: ExecutionContext = system.executionContext

  var orders: List[Item] = Nil

  // domain model
  final case class Item(name: String, id: Long)
  final case class Order(items: List[Item])

  // formats for unmarshalling and marshalling
  implicit val itemFormat: RootJsonFormat[Item] = jsonFormat2(Item.apply)
  implicit val orderFormat: RootJsonFormat[Order] = jsonFormat1(Order.apply)

  // (fake) async database query api
  def fetchItem(itemId: Long): Future[Option[Item]] = Future {
    orders.find(o => o.id == itemId)
  }
  def saveOrder(order: Order): Future[Done] = {
    orders = order.items ::: orders
    Future { Done }
  }

  def main(args: Array[String]): Unit = {
    val route: Route =
      concat(
        get {
          pathPrefix("item" / LongNumber) { id =>
            // there might be no item for a given id
            val maybeItem: Future[Option[Item]] = fetchItem(id)

            onSuccess(maybeItem) {
              case Some(item) => complete(item)
              case None       => complete(StatusCodes.NotFound)
            }
          }
        },
        post {
          path("create-order") {
            entity(as[Order]) { order =>
              val saved: Future[Done] = saveOrder(order)
              onSuccess(saved) { _ => // we are not interested in the result value `Done` but only in the fact that it was successful
                complete("order created")
              }
            }
          }
        })

    val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}
Java
sourceimport org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.ConnectHttp;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.marshallers.jackson.Jackson;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.stream.javadsl.Flow;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import static org.apache.pekko.http.javadsl.server.PathMatchers.longSegment;

public class JacksonExampleTest extends AllDirectives {

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

    final Http http = Http.get(system);

    // In order to access all directives we need an instance where the routes are define.
    JacksonExampleTest app = new JacksonExampleTest();

    final CompletionStage<ServerBinding> binding =
        http.newServerAt("localhost", 8080).bind(app.createRoute());

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
        .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
        .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }

  // (fake) async database query api
  private CompletionStage<Optional<Item>> fetchItem(long itemId) {
    return CompletableFuture.completedFuture(Optional.of(new Item("foo", itemId)));
  }

  // (fake) async database query api
  private CompletionStage<Done> saveOrder(final Order order) {
    return CompletableFuture.completedFuture(Done.getInstance());
  }

  private Route createRoute() {

    return concat(
        get(
            () ->
                pathPrefix(
                    "item",
                    () ->
                        path(
                            longSegment(),
                            (Long id) -> {
                              final CompletionStage<Optional<Item>> futureMaybeItem = fetchItem(id);
                              return onSuccess(
                                  futureMaybeItem,
                                  maybeItem ->
                                      maybeItem
                                          .map(item -> completeOK(item, Jackson.marshaller()))
                                          .orElseGet(
                                              () -> complete(StatusCodes.NOT_FOUND, "Not Found")));
                            }))),
        post(
            () ->
                path(
                    "create-order",
                    () ->
                        entity(
                            Jackson.unmarshaller(Order.class),
                            order -> {
                              CompletionStage<Done> futureSaved = saveOrder(order);
                              return onSuccess(futureSaved, done -> complete("order created"));
                            }))));
  }

  private static class Item {

    final String name;
    final long id;

    @JsonCreator
    Item(@JsonProperty("name") String name, @JsonProperty("id") long id) {
      this.name = name;
      this.id = id;
    }

    public String getName() {
      return name;
    }

    public long getId() {
      return id;
    }
  }

  private static class Order {

    final List<Item> items;

    @JsonCreator
    Order(@JsonProperty("items") List<Item> items) {
      this.items = items;
    }

    public List<Item> getItems() {
      return items;
    }
  }
}

When you run this server, you can update the inventory via curl -H "Content-Type: application/json" -X POST -d '{"items":[{"name":"hhgtg","id":42}]}' http://localhost:8080/create-order on your terminal - adding an item named "hhgtg" and having an id=42; and then view the inventory either in a browser, at a url like: http://localhost:8080/item/42 - or on the terminal, via curl http://localhost:8080/item/42.

The logic for the marshalling and unmarshalling JSON in this example is provided by the “spray-json”“Jackson” library. See JSON Support)JSON Support) for more information about integration with this library.

Streaming

One of the strengths of Apache Pekko HTTP is that streaming data is at its heart meaning that both request and response bodies can be streamed through the server achieving constant memory usage even for very large requests or responses. Streaming responses will be backpressured by the remote client so that the server will not push data faster than the client can handle, streaming requests means that the server decides how fast the remote client can push the data of the request body.

Example that streams random numbers as long as the client accepts them:

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.stream.scaladsl._
import pekko.util.ByteString
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.{ ContentTypes, HttpEntity }
import pekko.http.scaladsl.server.Directives._
import scala.util.Random
import scala.io.StdIn

object HttpServerStreamingRandomNumbers {

  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem(Behaviors.empty, "RandomNumbers")
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.executionContext

    // streams are re-usable so we can define it here
    // and use it for every request
    val numbers = Source.fromIterator(() =>
      Iterator.continually(Random.nextInt()))

    val route =
      path("random") {
        get {
          complete(
            HttpEntity(
              ContentTypes.`text/plain(UTF-8)`,
              // transform each number to a chunk of bytes
              numbers.map(n => ByteString(s"$n\n"))))
        }
      }

    val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}
Java
sourceimport org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.ConnectHttp;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.model.*;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;

import java.util.Random;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;

public class HttpServerStreamRandomNumbersTest extends AllDirectives {

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

    final Http http = Http.get(system);

    // In order to access all directives we need an instance where the routes are define.
    HttpServerStreamRandomNumbersTest app = new HttpServerStreamRandomNumbersTest();

    final CompletionStage<ServerBinding> binding =
        http.newServerAt("localhost", 8080).bind(app.createRoute());

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
        .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
        .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }

  private Route createRoute() {
    final Random rnd = new Random();
    // streams are re-usable so we can define it here
    // and use it for every request
    Source<Integer, NotUsed> numbers =
        Source.fromIterator(() -> Stream.generate(rnd::nextInt).iterator());

    return concat(
        path(
            "random",
            () ->
                get(
                    () ->
                        complete(
                            HttpEntities.create(
                                ContentTypes.TEXT_PLAIN_UTF8,
                                // transform each number to a chunk of bytes
                                numbers.map(x -> ByteString.fromString(x + "\n")))))));
  }
}

Connecting to this service with a slow HTTP client would backpressure so that the next random number is produced on demand with constant memory usage on the server. This can be seen using curl and limiting the rate curl --limit-rate 50b 127.0.0.1:8080/random

Apache Pekko HTTP routes easily interact with actors. In this example one route allows for placing bids in a fire-and-forget style while the second route contains a request-response interaction with an actor. The resulting response is rendered as JSON and returned when the response arrives from the actor.

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.typed.scaladsl.AskPattern._
import pekko.actor.typed.scaladsl.Behaviors
import pekko.actor.typed.{ ActorRef, ActorSystem }
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import pekko.http.scaladsl.model.StatusCodes
import pekko.http.scaladsl.server.Directives._
import pekko.util.Timeout
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.io.StdIn

object HttpServerWithActorInteraction {

  object Auction {

    sealed trait Message

    case class Bid(userId: String, offer: Int) extends Message

    case class GetBids(replyTo: ActorRef[Bids]) extends Message

    case class Bids(bids: List[Bid])

    def apply: Behaviors.Receive[Message] = apply(List.empty)

    def apply(bids: List[Bid]): Behaviors.Receive[Message] = Behaviors.receive {
      case (ctx, bid @ Bid(userId, offer)) =>
        ctx.log.info(s"Bid complete: $userId, $offer")
        apply(bids :+ bid)
      case (_, GetBids(replyTo)) =>
        replyTo ! Bids(bids)
        Behaviors.same
    }

  }

  // these are from spray-json
  implicit val bidFormat: RootJsonFormat[Auction.Bid] = jsonFormat2(Auction.Bid.apply)
  implicit val bidsFormat: RootJsonFormat[Auction.Bids] = jsonFormat1(Auction.Bids.apply)

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem[Auction.Message] = ActorSystem(Auction.apply, "auction")
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext: ExecutionContext = system.executionContext

    val auction: ActorRef[Auction.Message] = system
    import Auction._

    val route =
      path("auction") {
        concat(
          put {
            parameters("bid".as[Int], "user") { (bid, user) =>
              // place a bid, fire-and-forget
              auction ! Bid(user, bid)
              complete(StatusCodes.Accepted, "bid placed")
            }
          },
          get {
            implicit val timeout: Timeout = 5.seconds

            // query the actor for the current auction state
            val bids: Future[Bids] = auction.ask(GetBids(_))
            complete(bids)
          })
      }

    val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done

  }
}
Java
source
import org.apache.pekko.NotUsed; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.actor.typed.javadsl.Receive; import org.apache.pekko.http.javadsl.ConnectHttp; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.ServerBinding; import org.apache.pekko.http.javadsl.marshallers.jackson.Jackson; import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; import org.apache.pekko.http.javadsl.model.StatusCodes; import org.apache.pekko.http.javadsl.server.AllDirectives; import org.apache.pekko.http.javadsl.server.Route; import org.apache.pekko.http.javadsl.unmarshalling.StringUnmarshallers; import org.apache.pekko.stream.javadsl.Flow; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; import static org.apache.pekko.actor.typed.javadsl.AskPattern.ask; public class HttpServerActorInteractionExample extends AllDirectives { private final ActorSystem<Auction.Message> system; private final ActorRef<Auction.Message> auction; public static void main(String[] args) throws Exception { // boot up server using the route as defined below ActorSystem<Auction.Message> system = ActorSystem.create(Auction.create(), "routes"); final Http http = Http.get(system); // In order to access all directives we need an instance where the routes are define. HttpServerActorInteractionExample app = new HttpServerActorInteractionExample(system); final CompletionStage<ServerBinding> binding = http.newServerAt("localhost", 8080).bind(app.createRoute()); System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop..."); System.in.read(); // let it run until user presses return binding .thenCompose(ServerBinding::unbind) // trigger unbinding from the port .thenAccept(unbound -> system.terminate()); // and shutdown when done } private HttpServerActorInteractionExample(final ActorSystem<Auction.Message> system) { this.system = system; this.auction = system; } private Route createRoute() { return concat( path( "auction", () -> concat( put( () -> parameter( StringUnmarshallers.INTEGER, "bid", bid -> parameter( "user", user -> { // place a bid, fire-and-forget auction.tell(new Auction.Bid(user, bid)); return complete(StatusCodes.ACCEPTED, "bid placed"); }))), get( () -> { // query the actor for the current auction state CompletionStage<Auction.Bids> bids = ask( auction, Auction.GetBids::new, Duration.ofSeconds(5), system.scheduler()); return completeOKWithFuture(bids, Jackson.marshaller()); })))); } static class Auction extends AbstractBehavior<Auction.Message> { interface Message {} static class Bid implements Message { public final String userId; public final int offer; Bid(String userId, int offer) { this.userId = userId; this.offer = offer; } } static class GetBids implements Message { final ActorRef<Bids> replyTo; GetBids(ActorRef<Bids> replyTo) { this.replyTo = replyTo; } } static class Bids { public final List<Bid> bids; Bids(List<Bid> bids) { this.bids = bids; } } public Auction(ActorContext<Message> context) { super(context); } private List<Bid> bids = new ArrayList<>(); public static Behavior<Message> create() { return Behaviors.setup(Auction::new); } @Override public Receive<Message> createReceive() { return newReceiveBuilder() .onMessage(Bid.class, this::onBid) .onMessage(GetBids.class, this::onGetBids) .build(); } private Behavior<Message> onBid(Bid bid) { bids.add(bid); getContext().getLog().info("Bid complete: {}, {}", bid.userId, bid.offer); return this; } private Behavior<Message> onGetBids(GetBids getBids) { getBids.replyTo.tell(new Bids(bids)); return this; } } }

When you run this server, you can add an auction bid via curl -X PUT "http://localhost:8080/auction?bid=22&user=MartinO" on the terminal; and then you can view the auction status either in a browser, at the url http://localhost:8080/auction, or, on the terminal, via curl http://localhost:8080/auction.

More details on how JSON marshalling and unmarshalling works can be found in the JSON Support section.

Read more about the details of the high level APIs in the section High-level Server-Side API.

Low-level HTTP server APIs

The low-level Apache Pekko HTTP server APIs allows for handling connections or individual requests by accepting HttpRequestHttpRequest s and answering them by producing HttpResponseHttpResponse s. This is provided by the pekko-http-core module, which is included automatically when you depend on pekko-http but can also be used on its own. APIs for handling such request-responses as function calls and as a Flow<HttpRequest, HttpResponse, ?>Flow[HttpRequest, HttpResponse, _] are available.

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.HttpMethods._
import pekko.http.scaladsl.model._

import scala.concurrent.ExecutionContext
import scala.io.StdIn

object HttpServerLowLevel {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem(Behaviors.empty, "lowlevel")
    // needed for the future map/flatmap in the end
    implicit val executionContext: ExecutionContext = system.executionContext

    val requestHandler: HttpRequest => HttpResponse = {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
        HttpResponse(entity = HttpEntity(
          ContentTypes.`text/html(UTF-8)`,
          "<html><body>Hello world!</body></html>"))

      case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
        HttpResponse(entity = "PONG!")

      case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
        sys.error("BOOM!")

      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404, entity = "Unknown resource!")
    }

    val bindingFuture = Http().newServerAt("localhost", 8080).bindSync(requestHandler)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done

  }
}
Java
sourceimport org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.ConnectHttp;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.model.ContentTypes;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.util.ByteString;

import java.util.concurrent.CompletionStage;

public class HttpServerLowLevelExample {

  public static void main(String[] args) throws Exception {
    ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "lowlevel");

    try {
      CompletionStage<ServerBinding> serverBindingFuture =
          Http.get(system)
              .newServerAt("localhost", 8080)
              .bindSync(
                  request -> {
                    if (request.getUri().path().equals("/"))
                      return HttpResponse.create()
                          .withEntity(
                              ContentTypes.TEXT_HTML_UTF8,
                              ByteString.fromString("<html><body>Hello world!</body></html>"));
                    else if (request.getUri().path().equals("/ping"))
                      return HttpResponse.create().withEntity(ByteString.fromString("PONG!"));
                    else if (request.getUri().path().equals("/crash"))
                      throw new RuntimeException("BOOM!");
                    else {
                      request.discardEntityBytes(system);
                      return HttpResponse.create()
                          .withStatus(StatusCodes.NOT_FOUND)
                          .withEntity("Unknown resource!");
                    }
                  });

      System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
      System.in.read(); // let it run until user presses return

      serverBindingFuture
          .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
          .thenAccept(unbound -> system.terminate()); // and shutdown when done

    } catch (RuntimeException e) {
      system.terminate();
    }
  }
}

Read more details about the low level APIs in the section Core Server API.

HTTP Client API

The client APIs provide methods for calling an HTTP server using the same HttpRequestHttpRequest and HttpResponseHttpResponse abstractions that Apache Pekko HTTP server uses but adds the concept of connection pools to allow multiple requests to the same server to be handled more performantly by re-using TCP connections to the server.

Example simple request:

Scala
source/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package docs.http.scaladsl

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._

import scala.concurrent.Future
import scala.util.{ Failure, Success }

object HttpClientSingleRequest {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem(Behaviors.empty, "SingleRequest")
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.executionContext

    val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://pekko.apache.org"))

    responseFuture
      .onComplete {
        case Success(res) => println(res)
        case Failure(_)   => sys.error("something wrong")
      }
  }
}
Java
sourceimport org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.model.*;
import org.apache.pekko.http.javadsl.server.examples.petstore.Pet;
import org.apache.pekko.stream.SystemMaterializer;

import java.util.concurrent.CompletionStage;

public class ClientSingleRequestExample {

  public static void main(String[] args) {
    final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SingleRequest");

    final CompletionStage<HttpResponse> responseFuture =
        Http.get(system).singleRequest(HttpRequest.create("https://pekko.apache.org"));
  }
}

Read more about the details of the client APIs in the section Consuming HTTP-based Services (Client-Side).

The modules that make up Apache Pekko HTTP

Apache Pekko HTTP is structured into several modules:

pekko-http
Higher-level functionality, like (un)marshalling, (de)compression as well as a powerful DSL for defining HTTP-based APIs on the server-side, this is the recommended way to write HTTP servers with Apache Pekko HTTP. Details can be found in the section High-level Server-Side API
pekko-http-core
A complete, mostly low-level, server- and client-side implementation of HTTP (incl. WebSockets) Details can be found in sections Core Server API and Consuming HTTP-based Services (Client-Side)
pekko-http-testkit
A test harness and set of utilities for verifying server-side service implementations
pekko-http2-support
The HTTP/2 implementation to be included only if HTTP/2 support is needed.
pekko-http-spray-json
Predefined glue-code for (de)serializing custom types from/to JSON with spray-json Details can be found here: JSON Support
pekko-http-xml
Predefined glue-code for (de)serializing custom types from/to XML with scala-xml Details can be found here: XML Support
pekko-http-jackson
Predefined glue-code for (de)serializing custom types from/to JSON with jackson