Text and charsets

The text flows allow to translate a stream of text data according to the used character sets. It supports conversion between ByteString and String, as well as conversion of the character set in binary text data in the form of ByteStrings.

The main use case for these flows is the transcoding of text read from a source with a certain character set, which may not be usable with other flows or sinks. For example may CSV data arrive in UTF-16 encoding, but the Apache Pekko Connectors CSV parser does only support UTF-8.

Project Info: Apache Pekko Connectors Text
Artifact
org.apache.pekko
pekko-connectors-text
1.2.0+3-e195cec2-SNAPSHOT
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.17, 2.12.20, 3.3.6
JPMS module namepekko.stream.connectors.text
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.1.5"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-text" % "1.2.0+3-e195cec2-SNAPSHOT",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.1.5</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-text_${scala.binary.version}</artifactId>
    <version>1.2.0+3-e195cec2-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.1.5",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-text_${versions.ScalaBinary}:1.2.0+3-e195cec2-SNAPSHOT"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Text transcoding

The text transcoding flow converts incoming binary text data (ByteString) to binary text data of another character encoding.

The flow fails with an UnmappableCharacterException, if a character is not representable in the targeted character set.

Scala
sourceimport java.nio.charset.StandardCharsets

import org.apache.pekko
import pekko.stream.connectors.text.scaladsl.TextFlow
import pekko.stream.scaladsl.FileIO

val byteStringSource: Source[ByteString, _] = // ...

byteStringSource
  .via(TextFlow.transcoding(StandardCharsets.UTF_16, StandardCharsets.UTF_8))
  .runWith(FileIO.toPath(targetFile))
Java
sourceSource<ByteString, ?> byteStringSource = // ...
    byteStringSource
        .via(TextFlow.transcoding(StandardCharsets.UTF_16, StandardCharsets.UTF_8))
        .runWith(FileIO.toPath(targetFile), system);

Text encoding

The text encoding flow converts incoming Strings to binary text data (ByteString) with the given character encoding.

The flow fails with an UnmappableCharacterException, if a character is not representable in the targeted character set.

Scala
sourceimport java.nio.charset.StandardCharsets

import org.apache.pekko
import pekko.stream.connectors.text.scaladsl.TextFlow
import pekko.stream.scaladsl.FileIO
import pekko.util.ccompat.JavaConverters._

val stringSource: Source[String, _] = // ...

stringSource
  .via(TextFlow.encoding(StandardCharsets.US_ASCII))
  .intersperse(ByteString("\n"))
  .runWith(FileIO.toPath(targetFile))
Java
sourceimport org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.connectors.text.javadsl.TextFlow;
import org.apache.pekko.stream.IOResult;
import org.apache.pekko.stream.javadsl.FileIO;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;

import java.nio.charset.StandardCharsets;

Source<String, ?> stringSource = // ...
    stringSource
        .via(TextFlow.encoding(StandardCharsets.US_ASCII))
        .intersperse(ByteString.fromString("\n"))
        .runWith(FileIO.toPath(targetFile), system);

Text decoding

The text decoding flow converts incoming ByteStrings to Strings using the given character encoding.

Scala
sourceimport java.nio.charset.StandardCharsets

import org.apache.pekko.stream.connectors.text.scaladsl.TextFlow

val byteStringSource: Source[ByteString, _] = // ...

val result: Future[immutable.Seq[String]] =
  byteStringSource
    .via(TextFlow.decoding(StandardCharsets.UTF_16))
    .runWith(Sink.seq)
Java
sourceSource<ByteString, ?> byteStringSource = // ...
    byteStringSource
        .via(TextFlow.decoding(StandardCharsets.UTF_16))
        .runWith(Sink.seq(), system);