RecordIO Framing
The codec parses a ByteString stream in the RecordIO format used by Apache Mesos into distinct frames.
For instance, the response body:
128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}
is parsed into frames:
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}
Project Info: Apache Pekko Connectors Simple Codecs (RecordIO) | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-simple-codecs
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.simplecodecs |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-simple-codecs" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-simple-codecs_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-simple-codecs_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Usage¶
The flow factory RecordIOFraming
provides a scanner
factory method for a Flow<ByteString, ByteString, ?>
which parses out RecordIO frames.
sourceimport org.apache.pekko.stream.connectors.recordio.scaladsl.RecordIOFraming
val FirstRecordData =
"""{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}"""
val SecondRecordData = """{"type":"HEARTBEAT"}"""
val FirstRecordWithPrefix = s"121\n$FirstRecordData"
val SecondRecordWithPrefix = s"20\n$SecondRecordData"
val basicSource: Source[ByteString, NotUsed] =
Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix))
val result: Future[Seq[ByteString]] = basicSource
.via(RecordIOFraming.scanner())
.runWith(Sink.seq)
sourceString firstRecordData =
"{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
String secondRecordData = "{\"type\":\"HEARTBEAT\"}";
String firstRecordWithPrefix = "121\n" + firstRecordData;
String secondRecordWithPrefix = "20\n" + secondRecordData;
Source<ByteString, NotUsed> basicSource =
Source.single(ByteString.fromString(firstRecordWithPrefix + secondRecordWithPrefix));
CompletionStage<List<ByteString>> result =
basicSource.via(RecordIOFraming.scanner()).runWith(Sink.seq(), system);
We obtain:
sourceval byteStrings = result.futureValue
byteStrings(0) shouldBe ByteString(FirstRecordData)
byteStrings(1) shouldBe ByteString(SecondRecordData)
sourceList<ByteString> byteStrings = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertThat(byteStrings.get(0), is(ByteString.fromString(firstRecordData)));
assertThat(byteStrings.get(1), is(ByteString.fromString(secondRecordData)));
Running the example code¶
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
sbt
> simpleCodecs/testOnly *.RecordIOFramingSpec
1.1.0