Extensible Markup Language - XML
XML parsing module offers Flows for parsing, processing and writing XML documents.
Project Info: Apache Pekko Connectors XML | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-xml
1.1.0-M1+122-2a90cd8c-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.xml |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
- sbt
val PekkoVersion = "1.1.2" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-xml" % "1.1.0-M1+122-2a90cd8c-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.1.2</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-xml_${scala.binary.version}</artifactId> <version>1.1.0-M1+122-2a90cd8c-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.2", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-xml_${versions.ScalaBinary}:1.1.0-M1+122-2a90cd8c-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
XML parsing
XML processing pipeline starts with an XmlParsing.parser
XmlParsing.parser
flow which parses a stream of ByteString
ByteString
s to XML parser events.
- Scala
-
source
val parse = Flow[String] .map(ByteString(_)) .via(XmlParsing.parser) .toMat(Sink.seq)(Keep.right)
- Java
-
source
final Sink<String, CompletionStage<List<ParseEvent>>> parse = Flow.<String>create() .map(ByteString::fromString) .via(XmlParsing.parser()) .toMat(Sink.seq(), Keep.right());
To parse an XML document run XML document source with this parser.
- Scala
-
source
val doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>" val resultFuture = Source.single(doc).runWith(parse)
- Java
-
source
final String doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>"; final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);
To make sense of the parser events, statefulMap
may be used to aggregate consecutive events and emit the relevant data. For more complex uses, a state machine will be required.
- Scala
-
source
val doc = ByteString("<doc><elem>elem1</elem><elem>elem2</elem></doc>") val result: Future[immutable.Seq[String]] = Source .single(doc) .via(XmlParsing.parser) .statefulMap(() => new StringBuilder())((textBuffer, parseEvent) => { parseEvent match { case _: StartElement => textBuffer.clear() (textBuffer, None) case s: EndElement if s.localName == "elem" => val text = textBuffer.toString (textBuffer, Some(text)) case t: TextEvent => textBuffer.append(t.text) (textBuffer, None) case _ => (textBuffer, None) } }, textBuffer => Some(Some(textBuffer.toString))) .collect { case Some(txt) => txt } .runWith(Sink.seq) result.futureValue should contain.inOrderOnly("elem1", "elem2")
- Java
-
source
ByteString doc = ByteString.fromString("<doc><elem>elem1</elem><elem>elem2</elem></doc>"); CompletionStage<List<String>> stage = Source.single(doc) .via(XmlParsing.parser()) .statefulMap(StringBuilder::new, (textBuffer, parseEvent) -> { // aggregation function switch (parseEvent.marker()) { case XMLStartElement: textBuffer.delete(0, textBuffer.length()); return Pair.create(textBuffer, Optional.<String>empty()); case XMLEndElement: EndElement s = (EndElement) parseEvent; switch (s.localName()) { case "elem": String text = textBuffer.toString(); return Pair.create(textBuffer, Optional.of(text)); default: return Pair.create(textBuffer, Optional.<String>empty()); } case XMLCharacters: case XMLCData: TextEvent t = (TextEvent) parseEvent; textBuffer.append(t.text()); return Pair.create(textBuffer, Optional.<String>empty()); default: return Pair.create(textBuffer, Optional.<String>empty()); } }, textBuffer -> Optional.of(Optional.of(textBuffer.toString()))) .via(Flow.flattenOptional()) .runWith(Sink.seq(), system); List<String> list = stage.toCompletableFuture().get(5, TimeUnit.SECONDS); assertThat(list, hasItems("elem1", "elem2"));
XML writing
XML processing pipeline ends with an XmlWriting.writer
XmlWriting.writer
flow which writes a stream of XML parser events to ByteString
ByteString
s.
- Scala
-
source
val writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent] .via(XmlWriting.writer) .map[String](_.utf8String) .toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right)
- Java
-
source
final Sink<ParseEvent, CompletionStage<String>> write = Flow.of(ParseEvent.class) .via(XmlWriting.writer()) .map(ByteString::utf8String) .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right()); final Sink<ParseEvent, CompletionStage<String>> write = Flow.of(ParseEvent.class) .via(XmlWriting.writer()) .map(ByteString::utf8String) .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right()); final Sink<ParseEvent, CompletionStage<String>> write = Flow.of(ParseEvent.class) .via(XmlWriting.writer(xmlOutputFactory)) .map(ByteString::utf8String) .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
To write an XML document run XML document source with this writer.
- Scala
-
source
val listEl = List( StartDocument, StartElement( "book", namespace = Some("urn:loc.gov:books"), prefix = Some("bk"), namespaceCtx = List(Namespace("urn:loc.gov:books", prefix = Some("bk")), Namespace("urn:ISBN:0-395-36341-6", prefix = Some("isbn")))), StartElement( "title", namespace = Some("urn:loc.gov:books"), prefix = Some("bk")), Characters("Cheaper by the Dozen"), EndElement("title"), StartElement( "number", namespace = Some("urn:ISBN:0-395-36341-6"), prefix = Some("isbn")), Characters("1568491379"), EndElement("number"), EndElement("book"), EndDocument) val doc = """<?xml version='1.0' encoding='UTF-8'?><bk:book xmlns:bk="urn:loc.gov:books" xmlns:isbn="urn:ISBN:0-395-36341-6"><bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>""" val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer) resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
- Java
-
source
final String doc = "<?xml version='1.0' encoding='UTF-8'?>" + "<bk:book xmlns:bk=\"urn:loc.gov:books\" xmlns:isbn=\"urn:ISBN:0-395-36341-6\">" + "<bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>"; final List<Namespace> nmList = new ArrayList<>(); nmList.add(Namespace.create("urn:loc.gov:books", Optional.of("bk"))); nmList.add(Namespace.create("urn:ISBN:0-395-36341-6", Optional.of("isbn"))); final List<ParseEvent> docList = new ArrayList<>(); docList.add(StartDocument.getInstance()); docList.add( StartElement.create( "book", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books"), nmList)); docList.add( StartElement.create( "title", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books"))); docList.add(Characters.create("Cheaper by the Dozen")); docList.add(EndElement.create("title")); docList.add( StartElement.create( "number", Collections.emptyList(), Optional.of("isbn"), Optional.of("urn:ISBN:0-395-36341-6"))); docList.add(Characters.create("1568491379")); docList.add(EndElement.create("number")); docList.add(EndElement.create("book")); docList.add(EndDocument.getInstance()); final CompletionStage<String> resultStage = Source.from(docList).runWith(write, system);
XML Subslice
Use XmlParsing.subslice
XmlParsing.subslice
to filter out all elements not corresponding to a certain path.
- Scala
-
source
val parse = Flow[String] .map(ByteString(_)) .via(XmlParsing.parser) .via(XmlParsing.subslice("doc" :: "elem" :: "item" :: Nil)) .toMat(Sink.seq)(Keep.right)
- Java
-
source
final Sink<String, CompletionStage<List<ParseEvent>>> parse = Flow.<String>create() .map(ByteString::fromString) .via(XmlParsing.parser()) .via(XmlParsing.subslice(Arrays.asList("doc", "elem", "item"))) .toMat(Sink.seq(), Keep.right());
To get a subslice of an XML document run XML document source with this parser.
- Scala
-
source
val doc = """ |<doc> | <elem> | <item>i1</item> | <item><sub>i2</sub></item> | <item>i3</item> | </elem> |</doc> """.stripMargin val resultFuture = Source.single(doc).runWith(parse)
- Java
-
source
final String doc = "<doc>" + " <elem>" + " <item>i1</item>" + " <item><sub>i2</sub></item>" + " <item>i3</item>" + " </elem>" + "</doc>"; final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);
XML Subtree
Use XmlParsing.subtree
XmlParsing.subtree
to handle elements matched to a certain path and their child nodes as org.w3c.dom.Element
.
- Scala
-
source
val parse = Flow[String] .map(ByteString(_)) .via(XmlParsing.parser) .via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil)) .toMat(Sink.seq)(Keep.right)
- Java
-
source
final Sink<String, CompletionStage<List<Element>>> parse = Flow.<String>create() .map(ByteString::fromString) .via(XmlParsing.parser()) .via(XmlParsing.subtree(Arrays.asList("doc", "elem", "item"))) .toMat(Sink.seq(), Keep.right());
To get a subtree of an XML document run XML document source with this parser.
- Scala
-
source
val doc = """ |<doc> | <elem> | <item>i1</item> | <item><sub>i2</sub></item> | <item>i3</item> | </elem> |</doc> """.stripMargin val resultFuture = Source.single(doc).runWith(parse)
- Java
-
source
final String doc = "<doc>" + " <elem>" + " <item>i1</item>" + " <item><sub>i2</sub></item>" + " <item>i3</item>" + " </elem>" + "</doc>"; final CompletionStage<List<Element>> resultStage = Source.single(doc).runWith(parse, system);