Extensible Markup Language - XML
XML parsing module offers Flows for parsing, processing and writing XML documents.
Project Info: Apache Pekko Connectors XML | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-xml
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.xml |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-xml" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-xml_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-xml_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
XML parsing¶
XML processing pipeline starts with an XmlParsing.parser
flow which parses a stream of ByteString
s to XML parser events.
sourceval parse = Flow[String]
.map(ByteString(_))
.via(XmlParsing.parser)
.toMat(Sink.seq)(Keep.right)
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
Flow.<String>create()
.map(ByteString::fromString)
.via(XmlParsing.parser())
.toMat(Sink.seq(), Keep.right());
To parse an XML document run XML document source with this parser.
sourceval doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>"
val resultFuture = Source.single(doc).runWith(parse)
sourcefinal String doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);
To make sense of the parser events, statefulMap
may be used to aggregate consecutive events and emit the relevant data. For more complex uses, a state machine will be required.
sourceval doc = ByteString("<doc><elem>elem1</elem><elem>elem2</elem></doc>")
val result: Future[immutable.Seq[String]] = Source
.single(doc)
.via(XmlParsing.parser)
.statefulMap(() => new StringBuilder())((textBuffer, parseEvent) => {
parseEvent match {
case _: StartElement =>
textBuffer.clear()
(textBuffer, None)
case s: EndElement if s.localName == "elem" =>
val text = textBuffer.toString
(textBuffer, Some(text))
case t: TextEvent =>
textBuffer.append(t.text)
(textBuffer, None)
case _ =>
(textBuffer, None)
}
}, textBuffer => Some(Some(textBuffer.toString)))
.collect {
case Some(txt) => txt
}
.runWith(Sink.seq)
result.futureValue should contain.inOrderOnly("elem1", "elem2")
sourceByteString doc = ByteString.fromString("<doc><elem>elem1</elem><elem>elem2</elem></doc>");
CompletionStage<List<String>> stage =
Source.single(doc)
.via(XmlParsing.parser())
.statefulMap(StringBuilder::new, (textBuffer, parseEvent) -> {
// aggregation function
switch (parseEvent.marker()) {
case XMLStartElement:
textBuffer.delete(0, textBuffer.length());
return Pair.create(textBuffer, Optional.<String>empty());
case XMLEndElement:
EndElement s = (EndElement) parseEvent;
switch (s.localName()) {
case "elem":
String text = textBuffer.toString();
return Pair.create(textBuffer, Optional.of(text));
default:
return Pair.create(textBuffer, Optional.<String>empty());
}
case XMLCharacters:
case XMLCData:
TextEvent t = (TextEvent) parseEvent;
textBuffer.append(t.text());
return Pair.create(textBuffer, Optional.<String>empty());
default:
return Pair.create(textBuffer, Optional.<String>empty());
}
}, textBuffer -> Optional.of(Optional.of(textBuffer.toString())))
.via(Flow.flattenOptional())
.runWith(Sink.seq(), system);
List<String> list = stage.toCompletableFuture().get(5, TimeUnit.SECONDS);
assertThat(list, hasItems("elem1", "elem2"));
XML writing¶
XML processing pipeline ends with an XmlWriting.writer
flow which writes a stream of XML parser events to ByteString
s.
sourceval writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent]
.via(XmlWriting.writer)
.map[String](_.utf8String)
.toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right)
sourcefinal Sink<ParseEvent, CompletionStage<String>> write =
Flow.of(ParseEvent.class)
.via(XmlWriting.writer())
.map(ByteString::utf8String)
.toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
Flow.of(ParseEvent.class)
.via(XmlWriting.writer())
.map(ByteString::utf8String)
.toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
final Sink<ParseEvent, CompletionStage<String>> write =
Flow.of(ParseEvent.class)
.via(XmlWriting.writer(xmlOutputFactory))
.map(ByteString::utf8String)
.toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
To write an XML document run XML document source with this writer.
sourceval listEl = List(
StartDocument,
StartElement(
"book",
namespace = Some("urn:loc.gov:books"),
prefix = Some("bk"),
namespaceCtx = List(Namespace("urn:loc.gov:books", prefix = Some("bk")),
Namespace("urn:ISBN:0-395-36341-6", prefix = Some("isbn")))),
StartElement(
"title",
namespace = Some("urn:loc.gov:books"),
prefix = Some("bk")),
Characters("Cheaper by the Dozen"),
EndElement("title"),
StartElement(
"number",
namespace = Some("urn:ISBN:0-395-36341-6"),
prefix = Some("isbn")),
Characters("1568491379"),
EndElement("number"),
EndElement("book"),
EndDocument)
val doc =
"""<?xml version='1.0' encoding='UTF-8'?><bk:book xmlns:bk="urn:loc.gov:books" xmlns:isbn="urn:ISBN:0-395-36341-6"><bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>"""
val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer)
resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
sourcefinal String doc =
"<?xml version='1.0' encoding='UTF-8'?>"
+ "<bk:book xmlns:bk=\"urn:loc.gov:books\" xmlns:isbn=\"urn:ISBN:0-395-36341-6\">"
+ "<bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>";
final List<Namespace> nmList = new ArrayList<>();
nmList.add(Namespace.create("urn:loc.gov:books", Optional.of("bk")));
nmList.add(Namespace.create("urn:ISBN:0-395-36341-6", Optional.of("isbn")));
final List<ParseEvent> docList = new ArrayList<>();
docList.add(StartDocument.getInstance());
docList.add(
StartElement.create(
"book",
Collections.emptyList(),
Optional.of("bk"),
Optional.of("urn:loc.gov:books"),
nmList));
docList.add(
StartElement.create(
"title", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books")));
docList.add(Characters.create("Cheaper by the Dozen"));
docList.add(EndElement.create("title"));
docList.add(
StartElement.create(
"number",
Collections.emptyList(),
Optional.of("isbn"),
Optional.of("urn:ISBN:0-395-36341-6")));
docList.add(Characters.create("1568491379"));
docList.add(EndElement.create("number"));
docList.add(EndElement.create("book"));
docList.add(EndDocument.getInstance());
final CompletionStage<String> resultStage = Source.from(docList).runWith(write, system);
XML Subslice¶
Use XmlParsing.subslice
to filter out all elements not corresponding to a certain path.
sourceval parse = Flow[String]
.map(ByteString(_))
.via(XmlParsing.parser)
.via(XmlParsing.subslice("doc" :: "elem" :: "item" :: Nil))
.toMat(Sink.seq)(Keep.right)
sourcefinal Sink<String, CompletionStage<List<ParseEvent>>> parse =
Flow.<String>create()
.map(ByteString::fromString)
.via(XmlParsing.parser())
.via(XmlParsing.subslice(Arrays.asList("doc", "elem", "item")))
.toMat(Sink.seq(), Keep.right());
To get a subslice of an XML document run XML document source with this parser.
sourceval doc =
"""
|<doc>
| <elem>
| <item>i1</item>
| <item><sub>i2</sub></item>
| <item>i3</item>
| </elem>
|</doc>
""".stripMargin
val resultFuture = Source.single(doc).runWith(parse)
sourcefinal String doc =
"<doc>"
+ " <elem>"
+ " <item>i1</item>"
+ " <item><sub>i2</sub></item>"
+ " <item>i3</item>"
+ " </elem>"
+ "</doc>";
final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);
XML Subtree¶
Use XmlParsing.subtree
to handle elements matched to a certain path and their child nodes as org.w3c.dom.Element
.
sourceval parse = Flow[String]
.map(ByteString(_))
.via(XmlParsing.parser)
.via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil))
.toMat(Sink.seq)(Keep.right)
sourcefinal Sink<String, CompletionStage<List<Element>>> parse =
Flow.<String>create()
.map(ByteString::fromString)
.via(XmlParsing.parser())
.via(XmlParsing.subtree(Arrays.asList("doc", "elem", "item")))
.toMat(Sink.seq(), Keep.right());
To get a subtree of an XML document run XML document source with this parser.
sourceval doc =
"""
|<doc>
| <elem>
| <item>i1</item>
| <item><sub>i2</sub></item>
| <item>i3</item>
| </elem>
|</doc>
""".stripMargin
val resultFuture = Source.single(doc).runWith(parse)
sourcefinal String doc =
"<doc>"
+ " <elem>"
+ " <item>i1</item>"
+ " <item><sub>i2</sub></item>"
+ " <item>i3</item>"
+ " </elem>"
+ "</doc>";
final CompletionStage<List<Element>> resultStage = Source.single(doc).runWith(parse, system);