RecordIO Framing

The codec parses a ByteString stream in the RecordIO format used by Apache Mesos into distinct frames.

For instance, the response body:

128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}

is parsed into frames:

{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}
Project Info: Apache Pekko Connectors Simple Codecs (RecordIO)
Artifact
org.apache.pekko
pekko-connectors-simple-codecs
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.simplecodecs
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-simple-codecs" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-simple-codecs_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-simple-codecs_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Usage

The flow factory RecordIOFramingRecordIOFraming provides a scanner factory method for a Flow[ByteString, ByteString, _]Flow<ByteString, ByteString, ?> which parses out RecordIO frames.

Scala
sourceimport org.apache.pekko.stream.connectors.recordio.scaladsl.RecordIOFraming

val FirstRecordData =
  """{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}"""
val SecondRecordData = """{"type":"HEARTBEAT"}"""

val FirstRecordWithPrefix = s"121\n$FirstRecordData"
val SecondRecordWithPrefix = s"20\n$SecondRecordData"

val basicSource: Source[ByteString, NotUsed] =
  Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix))

  val result: Future[Seq[ByteString]] = basicSource
    .via(RecordIOFraming.scanner())
    .runWith(Sink.seq)
Java
sourceString firstRecordData =
    "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
String secondRecordData = "{\"type\":\"HEARTBEAT\"}";

String firstRecordWithPrefix = "121\n" + firstRecordData;
String secondRecordWithPrefix = "20\n" + secondRecordData;

Source<ByteString, NotUsed> basicSource =
    Source.single(ByteString.fromString(firstRecordWithPrefix + secondRecordWithPrefix));

CompletionStage<List<ByteString>> result =
    basicSource.via(RecordIOFraming.scanner()).runWith(Sink.seq(), system);

We obtain:

Scala
sourceval byteStrings = result.futureValue

byteStrings(0) shouldBe ByteString(FirstRecordData)
byteStrings(1) shouldBe ByteString(SecondRecordData)
Java
sourceList<ByteString> byteStrings = result.toCompletableFuture().get(1, TimeUnit.SECONDS);

assertThat(byteStrings.get(0), is(ByteString.fromString(firstRecordData)));
assertThat(byteStrings.get(1), is(ByteString.fromString(secondRecordData)));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> simpleCodecs/testOnly *.RecordIOFramingSpec