RecordIO Framing

The codec parses a ByteString stream in the RecordIO format used by Apache Mesos into distinct frames.

For instance, the response body:

128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}

is parsed into frames:

{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}
Project Info: Apache Pekko Connectors Simple Codecs (RecordIO)
Artifact
org.apache.pekko
pekko-connectors-simple-codecs
1.1.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.simplecodecs
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
Maven
Gradle
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-simple-codecs" % "1.1.0",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
  <pekko.version>1.1.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-simple-codecs_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
def versions = [
  PekkoVersion: "1.1.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-simple-codecs_${versions.ScalaBinary}:1.1.0"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Usage

The flow factory RecordIOFraming provides a scanner factory method for a Flow<ByteString, ByteString, ?> which parses out RecordIO frames.

Scala
Java
sourceimport org.apache.pekko.stream.connectors.recordio.scaladsl.RecordIOFraming

val FirstRecordData =
  """{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}"""
val SecondRecordData = """{"type":"HEARTBEAT"}"""

val FirstRecordWithPrefix = s"121\n$FirstRecordData"
val SecondRecordWithPrefix = s"20\n$SecondRecordData"

val basicSource: Source[ByteString, NotUsed] =
  Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix))

  val result: Future[Seq[ByteString]] = basicSource
    .via(RecordIOFraming.scanner())
    .runWith(Sink.seq)
sourceString firstRecordData =
    "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
String secondRecordData = "{\"type\":\"HEARTBEAT\"}";

String firstRecordWithPrefix = "121\n" + firstRecordData;
String secondRecordWithPrefix = "20\n" + secondRecordData;

Source<ByteString, NotUsed> basicSource =
    Source.single(ByteString.fromString(firstRecordWithPrefix + secondRecordWithPrefix));

CompletionStage<List<ByteString>> result =
    basicSource.via(RecordIOFraming.scanner()).runWith(Sink.seq(), system);

We obtain:

Scala
Java
sourceval byteStrings = result.futureValue

byteStrings(0) shouldBe ByteString(FirstRecordData)
byteStrings(1) shouldBe ByteString(SecondRecordData)
sourceList<ByteString> byteStrings = result.toCompletableFuture().get(1, TimeUnit.SECONDS);

assertThat(byteStrings.get(0), is(ByteString.fromString(firstRecordData)));
assertThat(byteStrings.get(1), is(ByteString.fromString(secondRecordData)));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> simpleCodecs/testOnly *.RecordIOFramingSpec