Extensible Markup Language - XML

XML parsing module offers Flows for parsing, processing and writing XML documents.

Project Info: Apache Pekko Connectors XML
Artifact
org.apache.pekko
pekko-connectors-xml
1.1.0
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.15, 2.12.20, 3.3.4
JPMS module namepekko.stream.connectors.xml
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
Maven
Gradle
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-xml" % "1.1.0",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
  <pekko.version>1.1.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-xml_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
def versions = [
  PekkoVersion: "1.1.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-xml_${versions.ScalaBinary}:1.1.0"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

XML parsing

XML processing pipeline starts with an XmlParsing.parser flow which parses a stream of ByteStrings to XML parser events.

Scala
Java
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .toMat(Sink.seq)(Keep.right)
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .toMat(Sink.seq(), Keep.right());

To parse an XML document run XML document source with this parser.

Scala
Java
sourceval doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>"
val resultFuture = Source.single(doc).runWith(parse)
sourcefinal String doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);

To make sense of the parser events, statefulMap may be used to aggregate consecutive events and emit the relevant data. For more complex uses, a state machine will be required.

Scala
Java
sourceval doc = ByteString("<doc><elem>elem1</elem><elem>elem2</elem></doc>")
val result: Future[immutable.Seq[String]] = Source
  .single(doc)
  .via(XmlParsing.parser)
  .statefulMap(() => new StringBuilder())((textBuffer, parseEvent) => {
      parseEvent match {
        case _: StartElement =>
          textBuffer.clear()
          (textBuffer, None)
        case s: EndElement if s.localName == "elem" =>
          val text = textBuffer.toString
          (textBuffer, Some(text))
        case t: TextEvent =>
          textBuffer.append(t.text)
          (textBuffer, None)
        case _ =>
          (textBuffer, None)
      }
    }, textBuffer => Some(Some(textBuffer.toString)))
  .collect {
    case Some(txt) => txt
  }
  .runWith(Sink.seq)

result.futureValue should contain.inOrderOnly("elem1", "elem2")
sourceByteString doc = ByteString.fromString("<doc><elem>elem1</elem><elem>elem2</elem></doc>");
CompletionStage<List<String>> stage =
    Source.single(doc)
        .via(XmlParsing.parser())
        .statefulMap(StringBuilder::new, (textBuffer, parseEvent) -> {
            // aggregation function
            switch (parseEvent.marker()) {
                case XMLStartElement:
                    textBuffer.delete(0, textBuffer.length());
                    return Pair.create(textBuffer, Optional.<String>empty());
                case XMLEndElement:
                    EndElement s = (EndElement) parseEvent;
                    switch (s.localName()) {
                        case "elem":
                            String text = textBuffer.toString();
                            return Pair.create(textBuffer, Optional.of(text));
                        default:
                            return Pair.create(textBuffer, Optional.<String>empty());
                    }
                case XMLCharacters:
                case XMLCData:
                    TextEvent t = (TextEvent) parseEvent;
                    textBuffer.append(t.text());
                    return Pair.create(textBuffer, Optional.<String>empty());
                default:
                    return Pair.create(textBuffer, Optional.<String>empty());
            }
        }, textBuffer -> Optional.of(Optional.of(textBuffer.toString())))
        .via(Flow.flattenOptional())
        .runWith(Sink.seq(), system);

List<String> list = stage.toCompletableFuture().get(5, TimeUnit.SECONDS);
assertThat(list, hasItems("elem1", "elem2"));

XML writing

XML processing pipeline ends with an XmlWriting.writer flow which writes a stream of XML parser events to ByteStrings.

Scala
Java
sourceval writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent]
  .via(XmlWriting.writer)
  .map[String](_.utf8String)
  .toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right)
sourcefinal Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer())
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer())
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer(xmlOutputFactory))
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());

To write an XML document run XML document source with this writer.

Scala
Java
sourceval listEl = List(
  StartDocument,
  StartElement(
    "book",
    namespace = Some("urn:loc.gov:books"),
    prefix = Some("bk"),
    namespaceCtx = List(Namespace("urn:loc.gov:books", prefix = Some("bk")),
      Namespace("urn:ISBN:0-395-36341-6", prefix = Some("isbn")))),
  StartElement(
    "title",
    namespace = Some("urn:loc.gov:books"),
    prefix = Some("bk")),
  Characters("Cheaper by the Dozen"),
  EndElement("title"),
  StartElement(
    "number",
    namespace = Some("urn:ISBN:0-395-36341-6"),
    prefix = Some("isbn")),
  Characters("1568491379"),
  EndElement("number"),
  EndElement("book"),
  EndDocument)

val doc =
  """<?xml version='1.0' encoding='UTF-8'?><bk:book xmlns:bk="urn:loc.gov:books" xmlns:isbn="urn:ISBN:0-395-36341-6"><bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>"""
val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)
resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
sourcefinal String doc =
    "<?xml version='1.0' encoding='UTF-8'?>"
        + "<bk:book xmlns:bk=\"urn:loc.gov:books\" xmlns:isbn=\"urn:ISBN:0-395-36341-6\">"
        + "<bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>";
final List<Namespace> nmList = new ArrayList<>();
nmList.add(Namespace.create("urn:loc.gov:books", Optional.of("bk")));
nmList.add(Namespace.create("urn:ISBN:0-395-36341-6", Optional.of("isbn")));
final List<ParseEvent> docList = new ArrayList<>();
docList.add(StartDocument.getInstance());
docList.add(
    StartElement.create(
        "book",
        Collections.emptyList(),
        Optional.of("bk"),
        Optional.of("urn:loc.gov:books"),
        nmList));
docList.add(
    StartElement.create(
        "title", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books")));
docList.add(Characters.create("Cheaper by the Dozen"));
docList.add(EndElement.create("title"));
docList.add(
    StartElement.create(
        "number",
        Collections.emptyList(),
        Optional.of("isbn"),
        Optional.of("urn:ISBN:0-395-36341-6")));
docList.add(Characters.create("1568491379"));
docList.add(EndElement.create("number"));
docList.add(EndElement.create("book"));
docList.add(EndDocument.getInstance());

final CompletionStage<String> resultStage = Source.from(docList).runWith(write, system);

XML Subslice

Use XmlParsing.subslice to filter out all elements not corresponding to a certain path.

Scala
Java
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .via(XmlParsing.subslice("doc" :: "elem" :: "item" :: Nil))
  .toMat(Sink.seq)(Keep.right)
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .via(XmlParsing.subslice(Arrays.asList("doc", "elem", "item")))
        .toMat(Sink.seq(), Keep.right());

To get a subslice of an XML document run XML document source with this parser.

Scala
Java
sourceval doc =
  """
    |<doc>
    |  <elem>
    |    <item>i1</item>
    |    <item><sub>i2</sub></item>
    |    <item>i3</item>
    |  </elem>
    |</doc>
  """.stripMargin
val resultFuture = Source.single(doc).runWith(parse)
sourcefinal String doc =
    "<doc>"
        + "  <elem>"
        + "    <item>i1</item>"
        + "    <item><sub>i2</sub></item>"
        + "    <item>i3</item>"
        + "  </elem>"
        + "</doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);

XML Subtree

Use XmlParsing.subtree to handle elements matched to a certain path and their child nodes as org.w3c.dom.Element.

Scala
Java
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil))
  .toMat(Sink.seq)(Keep.right)
sourcefinal Sink<String, CompletionStage<List<Element>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .via(XmlParsing.subtree(Arrays.asList("doc", "elem", "item")))
        .toMat(Sink.seq(), Keep.right());

To get a subtree of an XML document run XML document source with this parser.

Scala
Java
sourceval doc =
  """
    |<doc>
    |  <elem>
    |    <item>i1</item>
    |    <item><sub>i2</sub></item>
    |    <item>i3</item>
    |  </elem>
    |</doc>
  """.stripMargin
val resultFuture = Source.single(doc).runWith(parse)
sourcefinal String doc =
    "<doc>"
        + "  <elem>"
        + "    <item>i1</item>"
        + "    <item><sub>i2</sub></item>"
        + "    <item>i3</item>"
        + "  </elem>"
        + "</doc>";
final CompletionStage<List<Element>> resultStage = Source.single(doc).runWith(parse, system);