Avro Parquet
The Avro Parquet connector provides an Apache Pekko Stream Source, Sink and Flow for push and pull data to and from Parquet files.
For more information about Apache Parquet please visit the official documentation.
Project Info: Apache Pekko Connectors Avro Parquet | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-avroparquet
1.1.0
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.avroparquet |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts¶
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-avroparquet" % "1.1.0",
"org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
<pekko.version>1.1.3</pekko.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-connectors-avroparquet_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
</dependencies>
def versions = [
PekkoVersion: "1.1.3",
ScalaBinary: "2.13"
]
dependencies {
implementation "org.apache.pekko:pekko-connectors-avroparquet_${versions.ScalaBinary}:1.1.0"
implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Source Initiation¶
Sometimes it might be useful to use a Parquet file as stream Source. For this we will need to create an AvroParquetReader
instance which will produce records as subtypes of GenericRecord
, the Avro record’s abstract representation.
sourceimport org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.AvroReadSupport
val conf: Configuration = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val reader =
AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()
sourceimport org.apache.parquet.hadoop.ParquetReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.parquet.avro.AvroParquetReader;
Configuration conf = new Configuration();
ParquetReader<GenericRecord> reader =
AvroParquetReader.<GenericRecord>builder(
HadoopInputFile.fromPath(new Path("./test.parquet"), conf))
.disableCompatibility()
.build();
After that, you can create the Parquet Source from the initialisation of AvroParquetReader
. This object requires an instance of a org.apache.parquet.hadoop.ParquetReader
typed by a subtype of GenericRecord
.
sourceval source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
sourceSource<GenericRecord, NotUsed> source = AvroParquetSource.create(reader);
Sink Initiation¶
On the other hand, you can use AvroParquetWriter
as the Apache Pekko Streams Sink implementation for writing to Parquet. In that case, its initialisation would require an instance of org.apache.parquet.hadoop.ParquetWriter
. It will also expect any subtype of GenericRecord
to be passed.
sourceimport com.sksamuel.avro4s.Record
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroReadSupport
val file = "./sample/path/test.parquet"
val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val writer =
AvroParquetWriter.builder[Record](new Path(file)).withConf(conf).withSchema(schema).build()
sourceimport org.apache.parquet.hadoop.ParquetWriter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopInputFile;
Configuration conf = new Configuration();
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(file))
.withConf(conf)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withSchema(schema)
.build();
After that, the AvroParquet Sink can already be used.
sourceval records: List[Record] = documents.map(format.to(_))
val source: Source[Record, NotUsed] = Source(records)
val result: Future[Done] = source
.runWith(AvroParquetSink(writer))
sourceSink<GenericRecord, CompletionStage<Done>> sink = AvroParquetSink.create(writer);
Flow Initiation¶
The representation of a ParquetWriter
as a Flow is also available to use as a streams flow stage, in which as well as the other representations, it will expect subtypes of the Parquet GenericRecord
type to be passed. As a result, it writes into a Parquet file and returns the same GenericRecord
s. Such a Flow stage can be easily created by using the AvroParquetFlow
and providing an AvroParquetWriter
instance as a parameter.
sourceval records: List[GenericRecord]
val source: Source[GenericRecord, NotUsed] = Source(records)
val avroParquet: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer)
val result =
source
.via(avroParquet)
.runWith(Sink.seq)
This is all the preparation that we are going to need.sourceParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path("./test.parquet"))
.withConf(conf)
.withSchema(schema)
.build();
Flow<GenericRecord, GenericRecord, NotUsed> flow = AvroParquetFlow.create(writer);
source.via(flow).runWith(Sink.ignore(), system);
Running the example code¶
The code in this guide is part of the runnable tests of this project. You are welcome to edit the code and run it in sbt.
sbt
> avroparquet/test