Marshalling

TODO overhaul for Java

Marshalling is the process of converting a higher-level (object) structure into some kind of lower-level representation, often a “wire format”. Other popular names for marshalling are “serialization” or “pickling”.

In Apache Pekko HTTP, marshalling means the conversion of an object of type T into a lower-level target type, e.g. a MessageEntity (which forms the “entity body” of an HTTP request or response) or a full HttpRequestHttpRequest or HttpResponseHttpResponse.

On the server-side, for example, marshalling is used to convert an application-domain object to a response entity. Requests can contain an AcceptAccept header that lists acceptable content types for the client, such as application/json and application/xml. A marshaller contains the logic to negotiate the result content types based on the AcceptAccept and the AcceptCharset headers.

Basic Design

Marshalling of instances of type A into instances of type B is performed by a Marshaller<A, B>Marshaller[A, B].

Contrary to what you might initially expect, Marshaller<A, B>Marshaller[A, B] is not a plain function A => B but rather essentially a function A => Future[List[Marshalling[B]]]A => CompletionStage<List<Marshalling<B>>>. Let’s dissect this rather complicated looking signature piece by piece to understand why marshallers are designed this way. Given an instance of type A a Marshaller<A, B>Marshaller[A, B] produces:

  1. A FutureCompletionStage: This is probably quite clear. Marshallers are not required to synchronously produce a result, so instead they return a future, which allows for asynchronicity in the marshalling process.

  2. of List: Rather than only a single target representation for A marshallers can offer several ones. Which one will be rendered onto the wire in the end is decided by content negotiation. For example, the Marshaller<OrderConfirmation, MessageEntity>Marshaller[OrderConfirmation, MessageEntity] might offer a JSON as well as an XML representation. The client can decide through the addition of an AcceptAccept request header which one is preferred. If the client doesn’t express a preference the first representation is picked.

  3. of Marshalling[B]Marshalling<B>: Rather than returning an instance of B directly marshallers first produce a Marshalling[B]Marshalling<B>. This allows for querying the MediaTypeMediaType and potentially the HttpCharsetHttpCharset that the marshaller will produce before the actual marshalling is triggered. Apart from enabling content negotiation this design allows for delaying the actual construction of the marshalling target instance to the very last moment when it is really needed.

This is how Marshalling is defined:

source/**
 * Describes one possible option for marshalling a given value.
 */
sealed trait Marshalling[+A] {
  def map[B](f: A => B): Marshalling[B]

  /**
   * Converts this marshalling to an opaque marshalling, i.e. a marshalling result that
   * does not take part in content type negotiation. The given charset is used if this
   * instance is a `WithOpenCharset` marshalling.
   */
  def toOpaque(charset: HttpCharset): Marshalling[A]
}

object Marshalling {

  /**
   * A Marshalling to a specific [[pekko.http.scaladsl.model.ContentType]].
   */
  final case class WithFixedContentType[A](
      contentType: ContentType,
      marshal: () => A) extends Marshalling[A] {
    def map[B](f: A => B): WithFixedContentType[B] = copy(marshal = () => f(marshal()))
    def toOpaque(charset: HttpCharset): Marshalling[A] = Opaque(marshal)
  }

  /**
   * A Marshalling to a specific [[pekko.http.scaladsl.model.MediaType]] with a flexible charset.
   */
  final case class WithOpenCharset[A](
      mediaType: MediaType.WithOpenCharset,
      marshal: HttpCharset => A) extends Marshalling[A] {
    def map[B](f: A => B): WithOpenCharset[B] = copy(marshal = cs => f(marshal(cs)))
    def toOpaque(charset: HttpCharset): Marshalling[A] = Opaque(() => marshal(charset))
  }

  /**
   * A Marshalling to an unknown MediaType and charset.
   * Circumvents content negotiation.
   */
  final case class Opaque[A](marshal: () => A) extends Marshalling[A] {
    def map[B](f: A => B): Opaque[B] = copy(marshal = () => f(marshal()))
    def toOpaque(charset: HttpCharset): Marshalling[A] = this
  }
}

Apache Pekko HTTP also defines a number of helpful aliases for the types of marshallers that you’ll likely work with most:

sourcetype ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
type ToByteStringMarshaller[T] = Marshaller[T, ByteString]
type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]

Predefined Marshallers

Apache Pekko HTTP already predefines a number of marshallers for the most common types. Specifically these are:

All marshallers can be found in MarshallerMarshaller.

Implicit Resolution

The marshalling infrastructure of Apache Pekko HTTP relies on a type-class based approach, which means that MarshallerMarshaller instances from a certain type A to a certain type B have to be available implicitly.

The implicits for most of the predefined marshallers in Apache Pekko HTTP are provided through the companion object of the MarshallerMarshaller trait. This means that they are always available and never need to be explicitly imported. Additionally, you can simply “override” them by bringing your own custom version into local scope.

Custom Marshallers

Apache Pekko HTTP gives you a few convenience tools for constructing marshallers for your own types. Before you do that you need to think about what kind of marshaller you want to create. If all your marshaller needs to produce is a MessageEntity then you should probably provide a ToEntityMarshaller[T]Marshaller<T, MessageEntity>Marshaller[T, MessageEntity]. The advantage here is that it will work on both the client- as well as the server-side since a ToResponseMarshaller[T]Marshaller<T, HttpResponse>Marshaller[T, HttpResponse] as well as a ToRequestMarshaller[T]Marshaller<T, HttpRequest>Marshaller[T, HttpRequest] can automatically be created if a ToEntityMarshaller[T]Marshaller<T, MessageEntity>Marshaller[T, MessageEntity] is available.

If, however, your marshaller also needs to set things like the response status code, the request method, the request URI or any headers then a ToEntityMarshaller[T]Marshaller<T, MessageEntity>Marshaller[T, MessageEntity] won’t work. You’ll need to fall down to providing a ToResponseMarshaller[T]Marshaller<T, HttpResponse>Marshaller[T, HttpResponse] or a ToRequestMarshaller[T]]Marshaller<T, HttpRequest>Marshaller[T, HttpRequest] directly.

For writing your own marshallers you won’t have to “manually” implement the MarshallerMarshaller traitclass directly.

Rather, it should be possible to use one of the convenience construction helpers defined on the MarshallerMarshaller companion:

sourceobject Marshaller
    extends GenericMarshallers
    with PredefinedToEntityMarshallers
    with PredefinedToResponseMarshallers
    with PredefinedToRequestMarshallers {

  /**
   * Creates a [[Marshaller]] from the given function.
   */
  def apply[A, B](f: ExecutionContext => A => Future[List[Marshalling[B]]]): Marshaller[A, B] =
    new Marshaller[A, B] {
      def apply(value: A)(implicit ec: ExecutionContext) =
        try f(ec)(value)
        catch { case NonFatal(e) => FastFuture.failed(e) }
    }

  /**
   * Helper for creating a [[Marshaller]] using the given function.
   */
  def strict[A, B](f: A => Marshalling[B]): Marshaller[A, B] =
    Marshaller { _ => a => FastFuture.successful(f(a) :: Nil) }

  /**
   * Helper for creating a "super-marshaller" from a number of "sub-marshallers".
   * Content-negotiation determines, which "sub-marshaller" eventually gets to do the job.
   *
   * Please note that all marshallers will actually be invoked in order to get the Marshalling object
   * out of them, and later decide which of the marshallings should be returned. This is by-design,
   * however in ticket as discussed in ticket https://github.com/akka/akka-http/issues/243 it MAY be
   * changed in later versions of Pekko HTTP.
   */
  def oneOf[A, B](marshallers: Marshaller[A, B]*): Marshaller[A, B] =
    Marshaller { implicit ec => a => FastFuture.sequence(marshallers.map(_(a))).fast.map(_.flatten.toList) }

  /**
   * Helper for creating a "super-marshaller" from a number of values and a function producing "sub-marshallers"
   * from these values. Content-negotiation determines, which "sub-marshaller" eventually gets to do the job.
   *
   * Please note that all marshallers will actually be invoked in order to get the Marshalling object
   * out of them, and later decide which of the marshallings should be returned. This is by-design,
   * however in ticket as discussed in ticket https://github.com/akka/akka-http/issues/243 it MAY be
   * changed in later versions of Pekko HTTP.
   */
  def oneOf[T, A, B](values: T*)(f: T => Marshaller[A, B]): Marshaller[A, B] =
    oneOf(values.map(f): _*)

  /**
   * Helper for creating a synchronous [[Marshaller]] to content with a fixed charset from the given function.
   */
  def withFixedContentType[A, B](contentType: ContentType)(marshal: A => B): Marshaller[A, B] =
    new Marshaller[A, B] {
      def apply(value: A)(implicit ec: ExecutionContext) =
        try FastFuture.successful {
            Marshalling.WithFixedContentType(contentType, () => marshal(value)) :: Nil
          }
        catch {
          case NonFatal(e) => FastFuture.failed(e)
        }

      override def compose[C](f: C => A): Marshaller[C, B] =
        Marshaller.withFixedContentType(contentType)(marshal.compose(f))
    }

  /**
   * Helper for creating a synchronous [[Marshaller]] to content with a negotiable charset from the given function.
   */
  def withOpenCharset[A, B](mediaType: MediaType.WithOpenCharset)(marshal: (A, HttpCharset) => B): Marshaller[A, B] =
    new Marshaller[A, B] {
      def apply(value: A)(implicit ec: ExecutionContext) =
        try FastFuture.successful {
            Marshalling.WithOpenCharset(mediaType, charset => marshal(value, charset)) :: Nil
          }
        catch {
          case NonFatal(e) => FastFuture.failed(e)
        }

      override def compose[C](f: C => A): Marshaller[C, B] =
        Marshaller.withOpenCharset(mediaType)((c: C, hc: HttpCharset) => marshal(f(c), hc))
    }

  /**
   * Helper for creating a synchronous [[Marshaller]] to non-negotiable content from the given function.
   */
  def opaque[A, B](marshal: A => B): Marshaller[A, B] =
    strict { value => Marshalling.Opaque(() => marshal(value)) }

  /**
   * Helper for creating a [[Marshaller]] combined of the provided `marshal` function
   * and an implicit Marshaller which is able to produce the required final type.
   */
  def combined[A, B, C](marshal: A => B)(implicit m2: Marshaller[B, C]): Marshaller[A, C] =
    Marshaller[A, C] { ec => a => m2.compose(marshal).apply(a)(ec) }
}

Deriving Marshallers

Sometimes you can save yourself some work by reusing existing marshallers for your custom ones. The idea is to “wrap” an existing marshaller with some logic to “re-target” it to your type.

In this regard wrapping a marshaller can mean one or both of the following two things:

  • Transform the input before it reaches the wrapped marshaller
  • Transform the output of the wrapped marshaller

For the latter (transforming the output) you can use baseMarshaller.map, which works exactly as it does for functions. For the former (transforming the input) you have four alternatives:

  • baseMarshaller.compose
  • baseMarshaller.composeWithEC
  • baseMarshaller.wrap
  • baseMarshaller.wrapWithEC

compose works just like it does for functions. wrap is a compose that allows you to also change the ContentType that the marshaller marshals to. The ...WithEC variants allow you to receive an ExecutionContext internally if you need one, without having to depend on one being available implicitly at the usage site.

Using Marshallers

In many places throughout Apache Pekko HTTP, marshallers are used implicitly, e.g. when you define how to complete a request using the Routing DSL.

However, you can also use the marshalling infrastructure directly if you wish, which can be useful for example in tests. The best entry point for this is the Marshal object, which you can use like this:

sourceimport scala.concurrent.Await
import scala.concurrent.duration._

import org.apache.pekko
import pekko.http.scaladsl.marshalling.Marshal
import pekko.http.scaladsl.model._

import system.dispatcher // ExecutionContext

val string = "Yeah"
val entityFuture = Marshal(string).to[MessageEntity]
val entity = Await.result(entityFuture, 1.second) // don't block in non-test code!
entity.contentType shouldEqual ContentTypes.`text/plain(UTF-8)`

val errorMsg = "Easy, pal!"
val responseFuture = Marshal(420 -> errorMsg).to[HttpResponse]
val response = Await.result(responseFuture, 1.second) // don't block in non-test code!
response.status shouldEqual StatusCodes.EnhanceYourCalm
response.entity.contentType shouldEqual ContentTypes.`text/plain(UTF-8)`

val request = HttpRequest(headers = List(headers.Accept(MediaTypes.`application/json`)))
val responseText = "Plaintext"
val respFuture = Marshal(responseText).toResponseFor(request) // with content negotiation!
a[Marshal.UnacceptableResponseContentTypeException] should be thrownBy {
  Await.result(respFuture, 1.second) // client requested JSON, we only have text/plain!
}

However, many directives dealing with marshalling also require that you pass a marshaller explicitly. The following example shows how to marshal Java bean classes to JSON using the Jackson JSON support:

sourceimport org.apache.pekko.http.javadsl.marshallers.jackson.Jackson;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.apache.pekko.http.javadsl.server.Directives.*;
import static org.apache.pekko.http.javadsl.unmarshalling.StringUnmarshallers.INTEGER;

private static Route putPetHandler(Map<Integer, Pet> pets, Pet thePet) {
  pets.put(thePet.getId(), thePet);
  return complete(StatusCodes.OK, thePet, Jackson.<Pet>marshaller());
}

private static Route alternativeFuturePutPetHandler(Map<Integer, Pet> pets, Pet thePet) {
  pets.put(thePet.getId(), thePet);
  CompletableFuture<Pet> futurePet = CompletableFuture.supplyAsync(() -> thePet);
  return completeOKWithFuture(futurePet, Jackson.<Pet>marshaller());
}