Extensible Markup Language - XML

XML parsing module offers Flows for parsing, processing and writing XML documents.

Project Info: Apache Pekko Connectors XML
Artifact
org.apache.pekko
pekko-connectors-xml
1.0.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
Scala versions2.13.14, 2.12.20, 3.3.3
JPMS module namepekko.stream.connectors.xml
License
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/apache/pekko-connectors

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-xml" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-xml_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-xml_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

XML parsing

XML processing pipeline starts with an XmlParsing.parserXmlParsing.parser flow which parses a stream of ByteStringByteStrings to XML parser events.

Scala
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .toMat(Sink.seq)(Keep.right)
Java
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .toMat(Sink.seq(), Keep.right());

To parse an XML document run XML document source with this parser.

Scala
sourceval doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>"
val resultFuture = Source.single(doc).runWith(parse)
Java
sourcefinal String doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);

To make sense of the parser events, statefulMapConcat may be used to aggregate consecutive events and emit the relevant data. For more complex uses, a state machine will be required.

Scala
sourceval doc = ByteString("<doc><elem>elem1</elem><elem>elem2</elem></doc>")
val result: Future[immutable.Seq[String]] = Source
  .single(doc)
  .via(XmlParsing.parser)
  .statefulMapConcat(() => {
    // state
    val textBuffer = new StringBuilder()
    // aggregation function
    parseEvent =>
      parseEvent match {
        case s: StartElement =>
          textBuffer.clear()
          immutable.Seq.empty
        case s: EndElement if s.localName == "elem" =>
          val text = textBuffer.toString
          immutable.Seq(text)
        case t: TextEvent =>
          textBuffer.append(t.text)
          immutable.Seq.empty
        case _ =>
          immutable.Seq.empty
      }
  })
  .runWith(Sink.seq)

result.futureValue should contain.inOrderOnly("elem1", "elem2")
Java
sourceByteString doc = ByteString.fromString("<doc><elem>elem1</elem><elem>elem2</elem></doc>");
CompletionStage<List<String>> stage =
    Source.single(doc)
        .via(XmlParsing.parser())
        .statefulMapConcat(
            () -> {
              // state
              final StringBuilder textBuffer = new StringBuilder();
              // aggregation function
              return parseEvent -> {
                switch (parseEvent.marker()) {
                  case XMLStartElement:
                    textBuffer.delete(0, textBuffer.length());
                    return Collections.emptyList();
                  case XMLEndElement:
                    EndElement s = (EndElement) parseEvent;
                    switch (s.localName()) {
                      case "elem":
                        String text = textBuffer.toString();
                        return Collections.singleton(text);
                      default:
                        return Collections.emptyList();
                    }
                  case XMLCharacters:
                  case XMLCData:
                    TextEvent t = (TextEvent) parseEvent;
                    textBuffer.append(t.text());
                    return Collections.emptyList();
                  default:
                    return Collections.emptyList();
                }
              };
            })
        .runWith(Sink.seq(), system);

List<String> list = stage.toCompletableFuture().get(5, TimeUnit.SECONDS);
assertThat(list, hasItems("elem1", "elem2"));

XML writing

XML processing pipeline ends with an XmlWriting.writerXmlWriting.writer flow which writes a stream of XML parser events to ByteStringByteStrings.

Scala
sourceval writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent]
  .via(XmlWriting.writer)
  .map[String](_.utf8String)
  .toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right)
Java
sourcefinal Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer())
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer())
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
    Flow.of(ParseEvent.class)
        .via(XmlWriting.writer(xmlOutputFactory))
        .map(ByteString::utf8String)
        .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());

To write an XML document run XML document source with this writer.

Scala
sourceval listEl = List(
  StartDocument,
  StartElement(
    "book",
    namespace = Some("urn:loc.gov:books"),
    prefix = Some("bk"),
    namespaceCtx = List(Namespace("urn:loc.gov:books", prefix = Some("bk")),
      Namespace("urn:ISBN:0-395-36341-6", prefix = Some("isbn")))),
  StartElement(
    "title",
    namespace = Some("urn:loc.gov:books"),
    prefix = Some("bk")),
  Characters("Cheaper by the Dozen"),
  EndElement("title"),
  StartElement(
    "number",
    namespace = Some("urn:ISBN:0-395-36341-6"),
    prefix = Some("isbn")),
  Characters("1568491379"),
  EndElement("number"),
  EndElement("book"),
  EndDocument)

val doc =
  """<?xml version='1.0' encoding='UTF-8'?><bk:book xmlns:bk="urn:loc.gov:books" xmlns:isbn="urn:ISBN:0-395-36341-6"><bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>"""
val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)
resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
Java
sourcefinal String doc =
    "<?xml version='1.0' encoding='UTF-8'?>"
        + "<bk:book xmlns:bk=\"urn:loc.gov:books\" xmlns:isbn=\"urn:ISBN:0-395-36341-6\">"
        + "<bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>";
final List<Namespace> nmList = new ArrayList<>();
nmList.add(Namespace.create("urn:loc.gov:books", Optional.of("bk")));
nmList.add(Namespace.create("urn:ISBN:0-395-36341-6", Optional.of("isbn")));
final List<ParseEvent> docList = new ArrayList<>();
docList.add(StartDocument.getInstance());
docList.add(
    StartElement.create(
        "book",
        Collections.emptyList(),
        Optional.of("bk"),
        Optional.of("urn:loc.gov:books"),
        nmList));
docList.add(
    StartElement.create(
        "title", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books")));
docList.add(Characters.create("Cheaper by the Dozen"));
docList.add(EndElement.create("title"));
docList.add(
    StartElement.create(
        "number",
        Collections.emptyList(),
        Optional.of("isbn"),
        Optional.of("urn:ISBN:0-395-36341-6")));
docList.add(Characters.create("1568491379"));
docList.add(EndElement.create("number"));
docList.add(EndElement.create("book"));
docList.add(EndDocument.getInstance());

final CompletionStage<String> resultStage = Source.from(docList).runWith(write, system);

XML Subslice

Use XmlParsing.subsliceXmlParsing.subslice to filter out all elements not corresponding to a certain path.

Scala
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .via(XmlParsing.subslice("doc" :: "elem" :: "item" :: Nil))
  .toMat(Sink.seq)(Keep.right)
Java
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .via(XmlParsing.subslice(Arrays.asList("doc", "elem", "item")))
        .toMat(Sink.seq(), Keep.right());

To get a subslice of an XML document run XML document source with this parser.

Scala
sourceval doc =
  """
    |<doc>
    |  <elem>
    |    <item>i1</item>
    |    <item><sub>i2</sub></item>
    |    <item>i3</item>
    |  </elem>
    |</doc>
  """.stripMargin
val resultFuture = Source.single(doc).runWith(parse)
Java
sourcefinal String doc =
    "<doc>"
        + "  <elem>"
        + "    <item>i1</item>"
        + "    <item><sub>i2</sub></item>"
        + "    <item>i3</item>"
        + "  </elem>"
        + "</doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);

XML Subtree

Use XmlParsing.subtreeXmlParsing.subtree to handle elements matched to a certain path and their child nodes as org.w3c.dom.Element.

Scala
sourceval parse = Flow[String]
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil))
  .toMat(Sink.seq)(Keep.right)
Java
sourcefinal Sink<String, CompletionStage<List<Element>>> parse =
    Flow.<String>create()
        .map(ByteString::fromString)
        .via(XmlParsing.parser())
        .via(XmlParsing.subtree(Arrays.asList("doc", "elem", "item")))
        .toMat(Sink.seq(), Keep.right());

To get a subtree of an XML document run XML document source with this parser.

Scala
sourceval doc =
  """
    |<doc>
    |  <elem>
    |    <item>i1</item>
    |    <item><sub>i2</sub></item>
    |    <item>i3</item>
    |  </elem>
    |</doc>
  """.stripMargin
val resultFuture = Source.single(doc).runWith(parse)
Java
sourcefinal String doc =
    "<doc>"
        + "  <elem>"
        + "    <item>i1</item>"
        + "    <item><sub>i2</sub></item>"
        + "    <item>i3</item>"
        + "  </elem>"
        + "</doc>";
final CompletionStage<List<Element>> resultStage = Source.single(doc).runWith(parse, system);