JSON
JSON Framing
Use Apache Pekko Stream JsonFraming to split a stream of ByteString
ByteString
elements into ByteString snippets of valid JSON objects.
JsonFraming.objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed]
Returns a Flow that implements a “brace counting” based framing stage for emitting valid JSON chunks.
Typical examples of data that one may want to frame using this stage include:
Very large arrays:
[{"id": 1}, {"id": 2}, [...], {"id": 999}]
Multiple concatenated JSON objects (with, or without commas between them):
{"id": 1}, {"id": 2}, [...], {"id": 999}
The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two elements are separated by multiple newlines or other whitespace characters. And of course is insensitive (and does not impact the emitting frame) to the JSON object’s internal formatting.
Streaming of nested structures
The method above is great for a stream of “flat” JSON objects (an array or just a stream of objects) but doesn’t work for the many use-cases that involve a nested structure. A common example is the response of a database, which might look more like this:
{
"size": 100,
"rows": [
{"id": 1, "doc": {}}
{"id": 2, "doc": {}}
...
]
}
The JSON reading module offers a flow, which allows to stream specific parts of that JSON structure. In this particular example, only the rows
array is interesting for the application, more specifically even: only the doc
inside each element of the array.
Project Info: Apache Pekko Connectors JSON Streaming | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-json-streaming
1.1.0-M1+154-6981eaa8-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.json.streaming |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.1.2" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-json-streaming" % "1.1.0-M1+154-6981eaa8-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.1.2</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-json-streaming_${scala.binary.version}</artifactId> <version>1.1.0-M1+154-6981eaa8-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.2", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-json-streaming_${versions.ScalaBinary}:1.1.0-M1+154-6981eaa8-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Example
To define which parts of the structure you want to stream the module supports JsonPath notation. For example:
- Stream all elements of the nested
rows
array:$.rows[*]
- Stream the value of
doc
of each element in the array:$.rows[*].doc
To extract the information needed, run a stream through the JsonReader.select
flow.