Extensible Markup Language - XML
XML parsing module offers Flows for parsing, processing and writing XML documents.
Project Info: Apache Pekko Connectors XML | |
Artifact | org.apache.pekko
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.xml |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
- sbt
val PekkoVersion = "1.1.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-xml" % "1.1.0", "org.apache.pekko" %% "pekko-stream" % PekkoVersion )
- Maven
<properties> <pekko.version>1.1.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-xml_${scala.binary.version}</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-xml_${versions.ScalaBinary}:1.1.0" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
XML parsing
XML processing pipeline starts with an XmlParsing.parser
flow which parses a stream of ByteString
s to XML parser events.
- Scala
val parse = Flow[String] .map(ByteString(_)) .via(XmlParsing.parser) .toMat(Sink.seq)(Keep.right)
- Java
final Sink<String, CompletionStage<List<ParseEvent>>> parse = Flow.<String>create() .map(ByteString::fromString) .via(XmlParsing.parser()) .toMat(Sink.seq(), Keep.right());
To parse an XML document run XML document source with this parser.
- Scala
val doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>" val resultFuture = Source.single(doc).runWith(parse)
- Java
final String doc = "<doc><elem>elem1</elem><elem>elem2</elem></doc>"; final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);
To make sense of the parser events, statefulMap
may be used to aggregate consecutive events and emit the relevant data. For more complex uses, a state machine will be required.
- Scala
val doc = ByteString("<doc><elem>elem1</elem><elem>elem2</elem></doc>") val result: Future[immutable.Seq[String]] = Source .single(doc) .via(XmlParsing.parser) .statefulMap(() => new StringBuilder())((textBuffer, parseEvent) => { parseEvent match { case _: StartElement => textBuffer.clear() (textBuffer, None) case s: EndElement if s.localName == "elem" => val text = textBuffer.toString (textBuffer, Some(text)) case t: TextEvent => textBuffer.append(t.text) (textBuffer, None) case _ => (textBuffer, None) } }, textBuffer => Some(Some(textBuffer.toString))) .collect { case Some(txt) => txt } .runWith(Sink.seq) result.futureValue should contain.inOrderOnly("elem1", "elem2")
- Java
ByteString doc = ByteString.fromString("<doc><elem>elem1</elem><elem>elem2</elem></doc>"); CompletionStage<List<String>> stage = Source.single(doc) .via(XmlParsing.parser()) .statefulMap(StringBuilder::new, (textBuffer, parseEvent) -> { // aggregation function switch (parseEvent.marker()) { case XMLStartElement: textBuffer.delete(0, textBuffer.length()); return Pair.create(textBuffer, Optional.<String>empty()); case XMLEndElement: EndElement s = (EndElement) parseEvent; switch (s.localName()) { case "elem": String text = textBuffer.toString(); return Pair.create(textBuffer, Optional.of(text)); default: return Pair.create(textBuffer, Optional.<String>empty()); } case XMLCharacters: case XMLCData: TextEvent t = (TextEvent) parseEvent; textBuffer.append(t.text()); return Pair.create(textBuffer, Optional.<String>empty()); default: return Pair.create(textBuffer, Optional.<String>empty()); } }, textBuffer -> Optional.of(Optional.of(textBuffer.toString()))) .via(Flow.flattenOptional()) .runWith(Sink.seq(), system); List<String> list = stage.toCompletableFuture().get(5, TimeUnit.SECONDS); assertThat(list, hasItems("elem1", "elem2"));
XML writing
XML processing pipeline ends with an XmlWriting.writer
flow which writes a stream of XML parser events to ByteString
- Scala
val writer: Sink[ParseEvent, Future[String]] = Flow[ParseEvent] .via(XmlWriting.writer) .map[String](_.utf8String) .toMat(Sink.fold[String, String]("")((t, u) => t + u))(Keep.right)
- Java
final Sink<ParseEvent, CompletionStage<String>> write = Flow.of(ParseEvent.class) .via(XmlWriting.writer()) .map(ByteString::utf8String) .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right()); final Sink<ParseEvent, CompletionStage<String>> write = Flow.of(ParseEvent.class) .via(XmlWriting.writer()) .map(ByteString::utf8String) .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right()); final Sink<ParseEvent, CompletionStage<String>> write = Flow.of(ParseEvent.class) .via(XmlWriting.writer(xmlOutputFactory)) .map(ByteString::utf8String) .toMat(Sink.fold("", (acc, el) -> acc + el), Keep.right());
To write an XML document run XML document source with this writer.
- Scala
val listEl = List( StartDocument, StartElement( "book", namespace = Some("urn:loc.gov:books"), prefix = Some("bk"), namespaceCtx = List(Namespace("urn:loc.gov:books", prefix = Some("bk")), Namespace("urn:ISBN:0-395-36341-6", prefix = Some("isbn")))), StartElement( "title", namespace = Some("urn:loc.gov:books"), prefix = Some("bk")), Characters("Cheaper by the Dozen"), EndElement("title"), StartElement( "number", namespace = Some("urn:ISBN:0-395-36341-6"), prefix = Some("isbn")), Characters("1568491379"), EndElement("number"), EndElement("book"), EndDocument) val doc = """<?xml version='1.0' encoding='UTF-8'?><bk:book xmlns:bk="urn:loc.gov:books" xmlns:isbn="urn:ISBN:0-395-36341-6"><bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>""" val resultFuture: Future[String] = Source.fromIterator[ParseEvent](() => listEl.iterator).runWith(writer) resultFuture.futureValue(Timeout(3.seconds)) should ===(doc)
- Java
final String doc = "<?xml version='1.0' encoding='UTF-8'?>" + "<bk:book xmlns:bk=\"urn:loc.gov:books\" xmlns:isbn=\"urn:ISBN:0-395-36341-6\">" + "<bk:title>Cheaper by the Dozen</bk:title><isbn:number>1568491379</isbn:number></bk:book>"; final List<Namespace> nmList = new ArrayList<>(); nmList.add(Namespace.create("urn:loc.gov:books", Optional.of("bk"))); nmList.add(Namespace.create("urn:ISBN:0-395-36341-6", Optional.of("isbn"))); final List<ParseEvent> docList = new ArrayList<>(); docList.add(StartDocument.getInstance()); docList.add( StartElement.create( "book", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books"), nmList)); docList.add( StartElement.create( "title", Collections.emptyList(), Optional.of("bk"), Optional.of("urn:loc.gov:books"))); docList.add(Characters.create("Cheaper by the Dozen")); docList.add(EndElement.create("title")); docList.add( StartElement.create( "number", Collections.emptyList(), Optional.of("isbn"), Optional.of("urn:ISBN:0-395-36341-6"))); docList.add(Characters.create("1568491379")); docList.add(EndElement.create("number")); docList.add(EndElement.create("book")); docList.add(EndDocument.getInstance()); final CompletionStage<String> resultStage = Source.from(docList).runWith(write, system);
XML Subslice
Use XmlParsing.subslice
to filter out all elements not corresponding to a certain path.
- Scala
val parse = Flow[String] .map(ByteString(_)) .via(XmlParsing.parser) .via(XmlParsing.subslice("doc" :: "elem" :: "item" :: Nil)) .toMat(Sink.seq)(Keep.right)
- Java
final Sink<String, CompletionStage<List<ParseEvent>>> parse = Flow.<String>create() .map(ByteString::fromString) .via(XmlParsing.parser()) .via(XmlParsing.subslice(Arrays.asList("doc", "elem", "item"))) .toMat(Sink.seq(), Keep.right());
To get a subslice of an XML document run XML document source with this parser.
- Scala
val doc = """ |<doc> | <elem> | <item>i1</item> | <item><sub>i2</sub></item> | <item>i3</item> | </elem> |</doc> """.stripMargin val resultFuture = Source.single(doc).runWith(parse)
- Java
final String doc = "<doc>" + " <elem>" + " <item>i1</item>" + " <item><sub>i2</sub></item>" + " <item>i3</item>" + " </elem>" + "</doc>"; final CompletionStage<List<ParseEvent>> resultStage = Source.single(doc).runWith(parse, system);
XML Subtree
Use XmlParsing.subtree
to handle elements matched to a certain path and their child nodes as org.w3c.dom.Element
- Scala
val parse = Flow[String] .map(ByteString(_)) .via(XmlParsing.parser) .via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil)) .toMat(Sink.seq)(Keep.right)
- Java
final Sink<String, CompletionStage<List<Element>>> parse = Flow.<String>create() .map(ByteString::fromString) .via(XmlParsing.parser()) .via(XmlParsing.subtree(Arrays.asList("doc", "elem", "item"))) .toMat(Sink.seq(), Keep.right());
To get a subtree of an XML document run XML document source with this parser.
- Scala
val doc = """ |<doc> | <elem> | <item>i1</item> | <item><sub>i2</sub></item> | <item>i3</item> | </elem> |</doc> """.stripMargin val resultFuture = Source.single(doc).runWith(parse)
- Java
final String doc = "<doc>" + " <elem>" + " <item>i1</item>" + " <item><sub>i2</sub></item>" + " <item>i3</item>" + " </elem>" + "</doc>"; final CompletionStage<List<Element>> resultStage = Source.single(doc).runWith(parse, system);