Extensible Markup Language - XML

XML parsing module offers Flows for parsing, processing and writing XML documents.

Project Info: Apache Pekko Connectors XML
Artifact
org.apache.pekko
pekko-connectors-xml
1.1.0-M1+122-2a90cd8c-SNAPSHOT
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.xml
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.1.2"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-xml" % "1.1.0-M1+122-2a90cd8c-SNAPSHOT",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.1.2</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-xml_${scala.binary.version}</artifactId>
    <version>1.1.0-M1+122-2a90cd8c-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.1.2",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-xml_${versions.ScalaBinary}:1.1.0-M1+122-2a90cd8c-SNAPSHOT"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

XML parsing

XML processing pipeline starts with an XmlParsing.parserXmlParsing.parser flow which parses a stream of ByteStringByteStrings to XML parser events.

Scala
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .toMat(Sink.seq)(Keep.right)
Java
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .toMat(Sink.seq(), Keep.right());

To parse an XML document run XML document source with this parser.

Scala
sourceval doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>"
val resultFuture = Source.single(doc).runWith(parse)
Java
sourcefinal String doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);

To make sense of the parser events, statefulMap may be used to aggregate consecutive events and emit the relevant data. For more complex uses, a state machine will be required.

Scala
sourceval doc = ByteString("<doc><elem>elem1</elem><elem>elem2</elem></doc>")
val result: Future[immutable.Seq[String]] = Source
  .single(doc)
  .via(XmlParsing.parser)
  .statefulMap(() => new StringBuilder())((textBuffer, parseEvent) => {
      parseEvent match {
        case _: StartElement =>
          textBuffer.clear()
          (textBuffer, None)
        case s: EndElement if s.localName == "elem" =>
          val text = textBuffer.toString
          (textBuffer, Some(text))
        case t: TextEvent =>
          textBuffer.append(t.text)
          (textBuffer, None)
        case _ =>
          (textBuffer, None)
      }
    }, textBuffer => Some(Some(textBuffer.toString)))
  .collect {
    case Some(txt) => txt
  }
  .runWith(Sink.seq)

result.futureValue should contain.inOrderOnly("elem1", "elem2")
Java
sourceByteString doc = ByteString.fromString("<doc><elem>elem1</elem><elem>elem2</elem></doc>");
CompletionStage<List<String>> stage =
    Source.single(doc)
        .via(XmlParsing.parser())
        .statefulMap(StringBuilder::new, (textBuffer, parseEvent) -> {
            // aggregation function
            switch (parseEvent.marker()) {
                case XMLStartElement:
                    textBuffer.delete(0, textBuffer.length());
                    return Pair.create(textBuffer, Optional.<String>empty());
                case XMLEndElement:
                    EndElement s = (EndElement) parseEvent;
                    switch (s.localName()) {
                        case "elem":
                            String text = textBuffer.toString();
                            return Pair.create(textBuffer, Optional.of(text));
                        default:
                            return Pair.create(textBuffer, Optional.<String>empty());
                    }
                case XMLCharacters:
                case XMLCData:
                    TextEvent t = (TextEvent) parseEvent;
                    textBuffer.append(t.text());
                    return Pair.create(textBuffer, Optional.<String>empty());
                default:
                    return Pair.create(textBuffer, Optional.<String>empty());
            }
        }, textBuffer -> Optional.of(Optional.of(textBuffer.toString())))
        .via(Flow.flattenOptional())
        .runWith(Sink.seq(), system);

List<String> list = stage.toCompletableFuture().get(5, TimeUnit.SECONDS);
assertThat(list, hasItems("elem1", "elem2"));

XML writing

XML processing pipeline ends with an XmlWriting.writerXmlWriting.writer flow which writes a stream of XML parser events to ByteStringByteStrings.

Scala
sourceval writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent]
  .via(XmlWriting.writer)
  .map[String](_.utf8String)
  .toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right)
Java
sourcefinal Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer())
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer())
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer(xmlOutputFactory))
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());

To write an XML document run XML document source with this writer.

Scala
sourceval listEl = List(
  StartDocument,
  StartElement(
    "book",
    namespace = Some("urn:loc.gov:books"),
    prefix = Some("bk"),
    namespaceCtx = List(Namespace("urn:loc.gov:books", prefix = Some("bk")),
      Namespace("urn:ISBN:0-395-36341-6", prefix = Some("isbn")))),
  StartElement(
    "title",
    namespace = Some("urn:loc.gov:books"),
    prefix = Some("bk")),
  Characters("Cheaper by the Dozen"),
  EndElement("title"),
  StartElement(
    "number",
    namespace = Some("urn:ISBN:0-395-36341-6"),
    prefix = Some("isbn")),
  Characters("1568491379"),
  EndElement("number"),
  EndElement("book"),
  EndDocument)

val doc =
  """<?xml version='1.0' encoding='UTF-8'?><bk:book xmlns:bk="urn:loc.gov:books" xmlns:isbn="urn:ISBN:0-395-36341-6"><bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>"""
val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)
resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
Java
sourcefinal String doc =
    "<?xml version='1.0' encoding='UTF-8'?>"
        + "<bk:book xmlns:bk=\"urn:loc.gov:books\" xmlns:isbn=\"urn:ISBN:0-395-36341-6\">"
        + "<bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>";
final List<Namespace> nmList = new ArrayList<>();
nmList.add(Namespace.create("urn:loc.gov:books", Optional.of("bk")));
nmList.add(Namespace.create("urn:ISBN:0-395-36341-6", Optional.of("isbn")));
final List<ParseEvent> docList = new ArrayList<>();
docList.add(StartDocument.getInstance());
docList.add(
    StartElement.create(
        "book",
        Collections.emptyList(),
        Optional.of("bk"),
        Optional.of("urn:loc.gov:books"),
        nmList));
docList.add(
    StartElement.create(
        "title", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books")));
docList.add(Characters.create("Cheaper by the Dozen"));
docList.add(EndElement.create("title"));
docList.add(
    StartElement.create(
        "number",
        Collections.emptyList(),
        Optional.of("isbn"),
        Optional.of("urn:ISBN:0-395-36341-6")));
docList.add(Characters.create("1568491379"));
docList.add(EndElement.create("number"));
docList.add(EndElement.create("book"));
docList.add(EndDocument.getInstance());

final CompletionStage<String> resultStage = Source.from(docList).runWith(write, system);

XML Subslice

Use XmlParsing.subsliceXmlParsing.subslice to filter out all elements not corresponding to a certain path.

Scala
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .via(XmlParsing.subslice("doc" :: "elem" :: "item" :: Nil))
  .toMat(Sink.seq)(Keep.right)
Java
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .via(XmlParsing.subslice(Arrays.asList("doc", "elem", "item")))
        .toMat(Sink.seq(), Keep.right());

To get a subslice of an XML document run XML document source with this parser.

Scala
sourceval doc =
  """
    |<doc>
    |  <elem>
    |    <item>i1</item>
    |    <item><sub>i2</sub></item>
    |    <item>i3</item>
    |  </elem>
    |</doc>
  """.stripMargin
val resultFuture = Source.single(doc).runWith(parse)
Java
sourcefinal String doc =
    "<doc>"
        + "  <elem>"
        + "    <item>i1</item>"
        + "    <item><sub>i2</sub></item>"
        + "    <item>i3</item>"
        + "  </elem>"
        + "</doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);

XML Subtree

Use XmlParsing.subtreeXmlParsing.subtree to handle elements matched to a certain path and their child nodes as org.w3c.dom.Element.

Scala
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil))
  .toMat(Sink.seq)(Keep.right)
Java
sourcefinal Sink<String, CompletionStage<List<Element>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .via(XmlParsing.subtree(Arrays.asList("doc", "elem", "item")))
        .toMat(Sink.seq(), Keep.right());

To get a subtree of an XML document run XML document source with this parser.

Scala
sourceval doc =
  """
    |<doc>
    |  <elem>
    |    <item>i1</item>
    |    <item><sub>i2</sub></item>
    |    <item>i3</item>
    |  </elem>
    |</doc>
  """.stripMargin
val resultFuture = Source.single(doc).runWith(parse)
Java
sourcefinal String doc =
    "<doc>"
        + "  <elem>"
        + "    <item>i1</item>"
        + "    <item><sub>i2</sub></item>"
        + "    <item>i3</item>"
        + "  </elem>"
        + "</doc>";
final CompletionStage<List<Element>> resultStage = Source.single(doc).runWith(parse, system);