Source Streaming
Apache Pekko HTTP supports completing a request with an Apache Pekko Source<T, ?>
Source[T, _]
, which makes it possible to easily build and consume streaming end-to-end APIs which apply back pressure throughout the entire stack.
It is possible to complete requests with raw Source<ByteString, ?>
Source[ByteString, _]
, however often it is more convenient to stream on an element-by-element basis, and allow Apache Pekko HTTP to handle the rendering internally - for example as a JSON array, or CSV stream (where each element is followed by a newline).
In the following sections we investigate how to make use of the JSON Streaming infrastructure, however the general hints apply to any kind of element-by-element streaming you could imagine.
JSON Streaming
JSON Streaming is a term referring to streaming a (possibly infinite) stream of element as independent JSON objects as a continuous HTTP request or response. The elements are most often separated using newlines, however do not have to be. Concatenating elements side-by-side or emitting “very long” JSON array is also another use case.
In the below examples, we’ll be referring to the Tweet
case class as our model, which is defined as:
- Scala
-
source
case class Tweet(uid: Int, txt: String)
- Java
-
source
private static final class JavaTweet { private int id; private String message; public JavaTweet(int id, String message) { this.id = id; this.message = message; } public int getId() { return id; } public void setId(int id) { this.id = id; } public void setMessage(String message) { this.message = message; } public String getMessage() { return message; } }
And as always with spray-json
, we provide our marshaller and unmarshaller instances as implicit values using the jsonFormat##
method to generate them statically:
- Scala
-
source
import spray.json.RootJsonFormat object MyTweetJsonProtocol extends pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport with spray.json.DefaultJsonProtocol { implicit val tweetFormat: RootJsonFormat[Tweet] = jsonFormat2(Tweet.apply) }
Responding with JSON Streams
In this example we implement an API representing an infinite stream of tweets, very much like Twitter’s Streaming API.
Firstly, we’ll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an Apache Pekko Streams Source<T, ?>
Source[T, _]
. One such trait, containing the needed marshallers is SprayJsonSupport
, which uses spray-json
(a high performance JSON parser library), and is shipped as part of Apache Pekko HTTP in the pekko-http-spray-json
module.
Once the general infrastructure is prepared, we import our model’s marshallers, generated by spray-json
(Step 1) and enable JSON Streaming by making an implicit EntityStreamingSupport
EntityStreamingSupport
instance available (Step 2). Apache Pekko HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you’d like to stream a different content type (for example plists or protobuf).
Firstly, we’ll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an Apache Pekko Streams Source<T, ?>
Source[T, ?]
. Here we’ll use the Jackson
helper class from pekko-http-jackson
(a separate library that you should add as a dependency if you want to use Jackson with Apache Pekko HTTP).
First we enable JSON Streaming by making an implicit EntityStreamingSupport
EntityStreamingSupport
instance available (Step 1).
The default mode of rendering a Source
Source
is to represent it as an JSON Array. If you want to change this representation for example to use Twitter style new-line separated JSON objects, you can do so by configuring the support trait accordingly.
In Step 1.1. we demonstrate how to configure the rendering to be new-line separated, and also how parallel marshalling can be applied. We configure the Support object to render the JSON as series of new-line separated JSON objects, simply by appending a ByteString consisting of a single new-line character to each ByteString in the stream. Although this format is not valid JSON, it is pretty popular since parsing it is relatively simple - clients need only to find the new-lines and apply JSON unmarshalling for an entire line of JSON.
The final step is simply completing a request using a Source of tweets, as simple as that:
- Scala
-
source
// [1] import "my protocol", for marshalling Tweet objects: import MyTweetJsonProtocol._ // [2] pick a Source rendering support trait: // Note that the default support renders the Source as JSON Array implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() val route = path("tweets") { // [3] simply complete a request with a source of tweets: val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } // tests ------------------------------------------------------------ val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`)) Get("/tweets").withHeaders(AcceptJson) ~> route ~> check { responseAs[String] shouldEqual """[""" + """{"txt":"#Pekko rocks!","uid":1},""" + """{"txt":"Streaming is so hot right now!","uid":2},""" + """{"txt":"You cannot enter the same river twice.","uid":3}""" + """]""" } // endpoint can only marshal Json, so it will *reject* requests for application/xml: Get("/tweets").withHeaders(AcceptXml) ~> route ~> check { handled should ===(false) rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`))) }
- Java
-
source
import static org.apache.pekko.http.javadsl.server.Directives.completeOKWithSource; import static org.apache.pekko.http.javadsl.server.Directives.get; import static org.apache.pekko.http.javadsl.server.Directives.parameter; import static org.apache.pekko.http.javadsl.server.Directives.path; // Step 1: Enable JSON streaming // we're not using this in the example, but it's the simplest way to start: // The default rendering is a JSON array: `[el, el, el , ...]` final JsonEntityStreamingSupport jsonStreaming = EntityStreamingSupport.json(); // Step 1.1: Enable and customise how we'll render the JSON, as a compact array: final ByteString start = ByteString.fromString("["); final ByteString between = ByteString.fromString(","); final ByteString end = ByteString.fromString("]"); final Flow<ByteString, ByteString, NotUsed> compactArrayRendering = Flow.of(ByteString.class).intersperse(start, between, end); final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json().withFramingRendererFlow(compactArrayRendering); // Step 2: implement the route final Route responseStreaming = path( "tweets", () -> get( () -> parameter( StringUnmarshallers.INTEGER, "n", n -> { final Source<JavaTweet, NotUsed> tws = Source.repeat(new JavaTweet(12, "Hello World!")).take(n); // Step 3: call complete* with your source, marshaller, and stream // rendering mode return completeOKWithSource( tws, Jackson.marshaller(), compactJsonSupport); }))); // tests: final TestRoute routes = testRoute(tweets()); // test happy path final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON)); routes .run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication)) .assertStatusCode(200) .assertEntity( "[{\"id\":12,\"message\":\"Hello World!\"},{\"id\":12,\"message\":\"Hello World!\"}]"); // test responses to potential errors final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT); routes .run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText)) .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406 .assertEntity( "Resource representation is only available with these types:\napplication/json"); // tests -------------------------------------------- final TestRoute routes = testRoute(csvTweets()); // test happy path final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV)); routes .run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv)) .assertStatusCode(200) .assertEntity("12,Hello World!\n" + "12,Hello World!\n"); // test responses to potential errors final Accept acceptText = Accept.create(MediaRanges.ALL_APPLICATION); routes .run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText)) .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406 .assertEntity( "Resource representation is only available with these types:\ntext/csv; charset=UTF-8");
The reason the EntityStreamingSupport
EntityStreamingSupport
has to be enabled explicitly is that one might want to configure how the stream should be rendered. We’ll discuss this in depth in the next section though.
Customising response rendering mode
Since it is not always possible to directly and confidently answer the question of how a stream of T
should look on the wire, the EntityStreamingSupport
EntityStreamingSupport
traits come into play and allow fine-tuning the stream’s rendered representation.
For example, in case of JSON Streaming, there isn’t really one standard about rendering the response. Some APIs prefer to render multiple JSON objects in a line-by-line fashion (Twitter’s streaming APIs for example), while others simply return very large arrays, which could be streamed as well.
Apache Pekko defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting a streaming API would still be able to consume it in a naive way if they’d want to.
The line-by-line approach however is also pretty popular even though it is not valid JSON. Its simplicity for client-side parsing is a strong point in case to pick this format for your Streaming APIs. Below we demonstrate how to reconfigure the support trait to render the JSON line-by-line.
- Scala
-
source
import MyTweetJsonProtocol._ // Configure the EntityStreamingSupport to render the elements as: // {"example":42} // {"example":43} // ... // {"example":1000} val newline = ByteString("\n") implicit val jsonStreamingSupport: EntityStreamingSupport = EntityStreamingSupport.json() .withFramingRenderer(Flow[ByteString].map(bs => bs ++ newline)) val route = path("tweets") { // [3] simply complete a request with a source of tweets: val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } // tests ------------------------------------------------------------ val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) Get("/tweets").withHeaders(AcceptJson) ~> route ~> check { responseAs[String] shouldEqual """{"txt":"#Pekko rocks!","uid":1}""" + "\n" + """{"txt":"Streaming is so hot right now!","uid":2}""" + "\n" + """{"txt":"You cannot enter the same river twice.","uid":3}""" + "\n" }
Another interesting feature is parallel marshalling. Since marshalling can potentially take much time, it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration option on EntityStreamingSupport
EntityStreamingSupport
and is configurable like this:
- Scala
-
source
import MyTweetJsonProtocol._ implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() .withParallelMarshalling(parallelism = 8, unordered = false) path("tweets") { val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) }
The above shown mode preserves ordering of the Source’s elements, which may sometimes be a required property, for example when streaming a strictly ordered dataset. Sometimes the concept of strict order does not apply to the data being streamed, though, which allows us to exploit this property and use an unordered
rendering.
This unordered
rendering can be enabled via a configuration option as shown below. Effectively, this allows Apache Pekko HTTP’s marshalling infrastructure to concurrently marshall up to as many elements as defined in parallelism
and emit the first one which is marshalled into the HttpResponse
HttpResponse
:
- Scala
-
source
import MyTweetJsonProtocol._ implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() .withParallelMarshalling(parallelism = 8, unordered = true) path("tweets" / "unordered") { val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) }
This allows us to potentially render elements faster into the HttpResponse, since it can avoid “head of line blocking”, in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall.
Consuming JSON Streaming uploads
Sometimes a client sends a streaming request. For example, an embedded device initiated a connection with the server and is feeding it with one line of measurement data.
In this example, we want to consume this data in a streaming fashion from the request entity and also apply back pressure to the underlying TCP connection should the server be unable to cope with the rate of incoming data. Back pressure is automatically applied thanks to Apache Pekko Streams.
- Scala
-
source
case class Measurement(id: String, value: Int) import spray.json.RootJsonFormat object MyMeasurementJsonProtocol extends pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport with spray.json.DefaultJsonProtocol { implicit val measurementFormat: RootJsonFormat[Measurement] = jsonFormat2(Measurement.apply) }
- Java
-
source
private static final class Measurement { private String id; private int value; public Measurement(String id, int value) { this.id = id; this.value = value; } public String getId() { return id; } public void setId(String id) { this.id = id; } public void setValue(int value) { this.value = value; } public int getValue() { return value; } } final Unmarshaller<ByteString, Measurement> Measurements = Jackson.byteStringUnmarshaller(Measurement.class);
- Scala
-
source
// [1] import "my protocol", for unmarshalling Measurement objects: import MyMeasurementJsonProtocol._ // [2] enable Json Streaming implicit val jsonStreamingSupport: EntityStreamingSupport = EntityStreamingSupport.json() // prepare your persisting logic here val persistMetrics = Flow[Measurement] val route = path("metrics") { // [3] extract Source[Measurement, _] entity(asSourceOf[Measurement]) { measurements => // alternative syntax: // entity(as[Source[Measurement, NotUsed]]) { measurements => val measurementsSubmitted: Future[Int] = measurements .via(persistMetrics) .runFold(0) { (cnt, _) => cnt + 1 } complete { measurementsSubmitted.map(n => s"""Total metrics received: $n""") } } } // tests ------------------------------------------------------------ // uploading an array or newline separated values works out of the box val data = HttpEntity( ContentTypes.`application/json`, """ |{"id":"temp","value":32} |{"id":"temp","value":31} | """.stripMargin) Post("/metrics", entity = data) ~> route ~> check { status should ===(StatusCodes.OK) responseAs[String] should ===("Total metrics received: 2") } // the FramingWithContentType will reject any content type that it does not understand: val xmlData = HttpEntity( ContentTypes.`text/xml(UTF-8)`, """|<data id="temp" value="32"/> |<data id="temp" value="31"/>""".stripMargin) Post("/metrics", entity = xmlData) ~> route ~> check { handled should ===(false) rejection should ===( UnsupportedRequestContentTypeRejection( Set(ContentTypes.`application/json`), Some(ContentTypes.`text/xml(UTF-8)`))) }
- Java
-
source
import static org.apache.pekko.http.javadsl.server.Directives.complete; import static org.apache.pekko.http.javadsl.server.Directives.entityAsSourceOf; import static org.apache.pekko.http.javadsl.server.Directives.extractMaterializer; import static org.apache.pekko.http.javadsl.server.Directives.onComplete; import static org.apache.pekko.http.javadsl.server.Directives.post; final Route incomingStreaming = path( "metrics", () -> post( () -> extractMaterializer( mat -> { final JsonEntityStreamingSupport jsonSupport = EntityStreamingSupport.json(); return entityAsSourceOf( Measurements, jsonSupport, sourceOfMeasurements -> { final CompletionStage<Integer> measurementCount = sourceOfMeasurements.runFold( 0, (acc, measurement) -> acc + 1, mat); return onComplete( measurementCount, c -> complete("Total number of measurements: " + c)); }); })));
Simple CSV streaming example
Apache Pekko HTTP provides another EntityStreamingSupport
EntityStreamingSupport
out of the box, namely csv
(comma-separated values). For completeness, we demonstrate its usage in the snippet below. As you’ll notice, switching between streaming modes is fairly simple: You only have to make sure that an implicit Marshaller
Marshaller
of the requested type is available and that the streaming support operates on the same Content-Type
as the rendered values. Otherwise, you’ll see an error during runtime that the marshaller did not expose the expected content type and thus we can’t render the streaming response).
- Scala
-
source
// [1] provide a marshaller to ByteString implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t => Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => { val txt = t.txt.replaceAll(",", ".") val uid = t.uid.toString ByteString(List(uid, txt).mkString(",")) }) } // [2] enable csv streaming: implicit val csvStreaming: EntityStreamingSupport = EntityStreamingSupport.csv() val route = path("tweets") { val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } // tests ------------------------------------------------------------ val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`)) Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check { responseAs[String] shouldEqual "1,#Pekko rocks!" + "\n" + "2,Streaming is so hot right now!" + "\n" + "3,You cannot enter the same river twice." + "\n" }
- Java
-
source
import static org.apache.pekko.http.javadsl.server.Directives.get; import static org.apache.pekko.http.javadsl.server.Directives.path; import static org.apache.pekko.http.javadsl.server.Directives.completeWithSource; final Marshaller<JavaTweet, ByteString> renderAsCsv = Marshaller.withFixedContentType( ContentTypes.TEXT_CSV_UTF8, t -> ByteString.fromString(t.getId() + "," + t.getMessage())); final CsvEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.csv(); final Route responseStreaming = path( "tweets", () -> get( () -> parameter( StringUnmarshallers.INTEGER, "n", n -> { final Source<JavaTweet, NotUsed> tws = Source.repeat(new JavaTweet(12, "Hello World!")).take(n); return completeWithSource(tws, renderAsCsv, compactJsonSupport); })));
Implementing custom EntityStreamingSupport traits
The EntityStreamingSupport
EntityStreamingSupport
infrastructure is open for extension and not bound to any single format, content type, or marshalling library. The provided JSON support does not rely on spray-json
directly, but uses Marshaller<T, ByteString>
Marshaller[T, ByteString]
instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).
When implementing a custom support trait, one should simply extend the EntityStreamingSupport
EntityStreamingSupport
abstract class and implement all of its methods. It’s best to use the existing implementations as a guideline.
Supporting custom content types
In order to marshal into custom content types, both a Marshaller
Marshaller
that can handle that content type as well as an EntityStreamingSupport
EntityStreamingSupport
of matching content type is required.
Refer to the complete example below, showcasing how to configure a custom marshaller and change the entity streaming support’s content type to be compatible. This is an area that would benefit from additional type safety, which we hope to add in a future release.
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import pekko.actor.ActorSystem import pekko.http.scaladsl.Http import pekko.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport } import pekko.http.scaladsl.model.{ HttpEntity, _ } import pekko.http.scaladsl.server.Directives._ import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import pekko.http.scaladsl.marshalling.{ Marshaller, ToEntityMarshaller } import pekko.stream.scaladsl.Source import spray.json.DefaultJsonProtocol import scala.concurrent.ExecutionContext import scala.io.StdIn import scala.util.Random final case class User(name: String, id: String) trait UserProtocol extends DefaultJsonProtocol { import spray.json._ implicit val userFormat: JsonFormat[User] = jsonFormat2(User.apply) val `vnd.example.api.v1+json` = MediaType.applicationWithFixedCharset("vnd.example.api.v1+json", HttpCharsets.`UTF-8`) val ct = ContentType.apply(`vnd.example.api.v1+json`) implicit def userMarshaller: ToEntityMarshaller[User] = Marshaller.oneOf( Marshaller.withFixedContentType(`vnd.example.api.v1+json`) { (user: User) => HttpEntity(`vnd.example.api.v1+json`, user.toJson.compactPrint) }) } object ApiServer extends App with UserProtocol { implicit val system: ActorSystem = ActorSystem("api") implicit val executionContext: ExecutionContext = system.dispatcher implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() .withContentType(ct) .withParallelMarshalling(parallelism = 10, unordered = false) // (fake) async database query api def dummyUser(id: String) = User(s"User $id", id.toString) def fetchUsers(): Source[User, NotUsed] = Source.fromIterator(() => Iterator.fill(10000) { val id = Random.nextInt() dummyUser(id.toString) }) val route = pathPrefix("users") { get { complete(fetchUsers()) } } val bindingFuture = Http().newServerAt("localhost", 8080).bind(route) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate()) }
- Java
-
source
import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.common.EntityStreamingSupport; import org.apache.pekko.http.javadsl.marshalling.Marshaller; import org.apache.pekko.http.javadsl.model.*; import org.apache.pekko.http.javadsl.server.AllDirectives; import org.apache.pekko.http.javadsl.server.Route; import org.apache.pekko.stream.javadsl.Source; import java.util.Random; import java.util.stream.Stream; public class JsonStreamingFullExample extends AllDirectives { public Route createRoute() { final MediaType.WithFixedCharset mediaType = MediaTypes.applicationWithFixedCharset("vnd.example.api.v1+json", HttpCharsets.UTF_8); final ContentType.WithFixedCharset contentType = ContentTypes.create(mediaType); final Marshaller<User, RequestEntity> userMarshaller = Marshaller.withFixedContentType( contentType, (User user) -> HttpEntities.create(contentType, user.toJson())); final EntityStreamingSupport jsonStreamingSupport = EntityStreamingSupport.json() .withContentType(contentType) .withParallelMarshalling(10, false); return get( () -> pathPrefix( "users", () -> completeOKWithSource(fetchUsers(), userMarshaller, jsonStreamingSupport))); } private Source<User, NotUsed> fetchUsers() { final Random rnd = new Random(); return Source.fromIterator( () -> Stream.generate(rnd::nextInt).map(this::dummyUser).limit(10000).iterator()); } private User dummyUser(int id) { return new User(id, "User " + id); } static final class User { int id; String name; User(int id, String name) { this.id = id; this.name = name; } String toJson() { return "{\"id\":\"" + id + "\", \"name\":\"" + name + "\"}"; } } public static void main(String[] args) { ActorSystem system = ActorSystem.create(); final JsonStreamingFullExample app = new JsonStreamingFullExample(); final Http http = Http.get(system); http.newServerAt("localhost", 8080).bind(app.createRoute()); } }
Consuming streaming JSON on client-side
For consuming such streaming APIs with, for example, JSON responses refer to Consuming JSON Streaming style APIs documentation in the JSON support section.