Google Cloud BigQuery Storage
The BigQuery Storage API offers fast access to BigQuery-managed storage using an rpc-based protocol. It is seen as an improvement over the REST API, and bulk data extract
jobs for accessing BigQuery-managed table data, but doesn’t offer any functionality around managing BigQuery resources. Further information at the official Google Cloud documentation website.
This connector communicates to the BigQuery Storage API via the gRPC protocol. The integration between Apache Pekko Stream and gRPC is handled by the Apache Pekko gRPC library. Currently, this connector only supports returning each row as an Avro GenericRecord.
Project Info: Apache Pekko Connectors Google Cloud BigQuery Storage | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-google-cloud-bigquery-storage
1.0.2
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 |
Scala versions | 2.13.14, 2.12.20, 3.3.3 |
JPMS module name | pekko.stream.connectors.google.cloud.bigquery.storage |
License | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Artifacts
Apache Pekko gRPC uses Apache Pekko Discovery internally. Make sure to add Apache Pekko Discovery with the same Apache Pekko version that the application uses.
- sbt
val PekkoVersion = "1.0.3" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-google-cloud-bigquery-storage" % "1.0.2", "org.apache.pekko" %% "pekko-stream" % PekkoVersion, "org.apache.pekko" %% "pekko-discovery" % PekkoVersion )
- Maven
<properties> <pekko.version>1.0.3</pekko.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-google-cloud-bigquery-storage_${scala.binary.version}</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-discovery_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.0.3", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-google-cloud-bigquery-storage_${versions.ScalaBinary}:1.0.2" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" implementation "org.apache.pekko:pekko-discovery_${versions.ScalaBinary}:${versions.PekkoVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Build setup
The Apache Pekko Connectors Google Cloud BigQuery Storage library contains the classes generated from Google’s protobuf specification.
HTTP/2 requires ALPN negotiation, which comes with the JDK starting with version 8u251.
For older versions of the JDK you will need to load the jetty-alpn-agent
yourself, but we recommend upgrading.
Configuration
The BigQuery Storage connector shares its basic configuration with all the Google connectors in Apache Pekko Connectors.
Example Test Configuration
pekko.connectors.google.cloud.bigquery.grpc {
host = "localhost"
port = 21000
rootCa = "none"
callCredentials = "none"
}
For more configuration details consider the underlying configuration for Apache Pekko gRPC.
A manually initialized org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.GrpcBigQueryStorageReader
org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.javadsl.GrpcBigQueryStorageReader
can be used by providing it as an attribute to the stream:
- Scala
-
source
val reader: GrpcBigQueryStorageReader = GrpcBigQueryStorageReader(BigQueryStorageSettings("localhost", 8000)) val sourceForReader: Source[(ReadSession.Schema, Seq[Source[ReadRowsResponse.Rows, NotUsed]]), Future[NotUsed]] = BigQueryStorage .create("projectId", "datasetId", "tableId", DataFormat.AVRO) .withAttributes( BigQueryStorageAttributes.reader(reader))
- Java
-
source
GrpcBigQueryStorageReader reader = GrpcBigQueryStorageReader.apply(BigQueryStorageSettings.apply("localhost", 8000), sys); Source< Tuple2< com.google.cloud.bigquery.storage.v1.stream.ReadSession.Schema, List<Source<ReadRowsResponse.Rows, NotUsed>>>, CompletionStage<NotUsed>> sourceForReader = BigQueryStorage.create( "projectId", "datasetId", "tableId", DataFormat.AVRO, readOptions, 1) .withAttributes(BigQueryStorageAttributes.reader(reader));
Reading
We can read in a number of ways. To read data from a table a read session needs to be created. On the session creation we can specify the number of streams to be used in order to transfer the data, this makes it feasible to achieve parallelism while ingesting the data, thus achieving better performance. To create a session the data format needs to be specified. The options provided are Avro and Arrow.
If no TableReadOptions
are specified all the table’s columns shall be retrieved as a Source
containing a Source
for each stream, which will each deliver a section of the rows:
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import com.google.cloud.bigquery.storage.v1.storage.ReadRowsResponse import com.google.cloud.bigquery.storage.v1.DataFormat import com.google.cloud.bigquery.storage.v1.stream.ReadSession import pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.BigQueryStorage import pekko.stream.scaladsl.Source import com.google.cloud.bigquery.storage.v1.stream.ReadSession.TableReadOptions import scala.concurrent.Future val sourceOfSources: Source[(ReadSession.Schema, Seq[Source[ReadRowsResponse.Rows, NotUsed]]), Future[NotUsed]] = BigQueryStorage.create("projectId", "datasetId", "tableId", DataFormat.AVRO)
- Java
-
source
import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.BigQueryRecord; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.BigQueryStorageSettings; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.javadsl.BigQueryArrowStorage; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.javadsl.BigQueryAvroStorage; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.javadsl.BigQueryStorage; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.BigQueryStorageAttributes; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.GrpcBigQueryStorageReader; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.util.ByteString; import scala.Tuple2; import com.google.cloud.bigquery.storage.v1.DataFormat; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.storage.ReadRowsResponse; import org.apache.pekko.http.javadsl.unmarshalling.Unmarshaller; Source< Tuple2< com.google.cloud.bigquery.storage.v1.stream.ReadSession.Schema, List<Source<ReadRowsResponse.Rows, NotUsed>>>, CompletionStage<NotUsed>> sourceOfSources = BigQueryStorage.create("projectId", "datasetId", "tableId", DataFormat.AVRO);
Secondly, by specifying TableReadOptions
, we can narrow down the amount of data returned, filtering down the columns returned, and/or a row_restriction
. This is defined as:
SQL text filtering statement, similar to a WHERE clause in a query. Currently, only a single predicate that is a comparison between a column and a constant value is supported. Aggregates are not supported.
- Scala
-
source
val readOptions = TableReadOptions(selectedFields = Seq("stringField", "intField"), rowRestriction = "intField >= 5") val sourceOfSourcesFiltered : Source[(ReadSession.Schema, Seq[Source[ReadRowsResponse.Rows, NotUsed]]), Future[NotUsed]] = BigQueryStorage.create("projectId", "datasetId", "tableId", DataFormat.AVRO, Some(readOptions))
- Java
-
source
ReadSession.TableReadOptions readOptions = ReadSession.TableReadOptions.newBuilder() .setSelectedFields(0, "stringField") .setSelectedFields(1, "intField") .setRowRestriction("intField >= 5") .build(); Source< Tuple2< com.google.cloud.bigquery.storage.v1.stream.ReadSession.Schema, List<Source<ReadRowsResponse.Rows, NotUsed>>>, CompletionStage<NotUsed>> sourceOfSourcesFiltered = BigQueryStorage.create( "projectId", "datasetId", "tableId", DataFormat.AVRO, readOptions, 1);
You can then choose to read and process these streams as is or merged. You can process the streams merged in rows. You need to provide a ByteString
Unmarshaller
based on the format requested.
- Scala
-
source
implicit val unmarshaller: FromByteStringUnmarshaller[List[BigQueryRecord]] = mock[FromByteStringUnmarshaller[List[BigQueryRecord]]] val sequentialSource: Source[List[BigQueryRecord], Future[NotUsed]] = BigQueryStorage.createMergedStreams("projectId", "datasetId", "tableId", DataFormat.AVRO)
- Java
-
source
Unmarshaller<ByteString, List<BigQueryRecord>> unmarshaller = null; Source<List<BigQueryRecord>, CompletionStage<NotUsed>> sequentialSource = BigQueryStorage.<List<BigQueryRecord>>createMergedStreams( "projectId", "datasetId", "tableId", DataFormat.AVRO, unmarshaller);
Or process the stream of rows individually:
- Scala
-
source
import org.apache.pekko import pekko.NotUsed import com.google.cloud.bigquery.storage.v1.storage.ReadRowsResponse import com.google.cloud.bigquery.storage.v1.DataFormat import com.google.cloud.bigquery.storage.v1.stream.ReadSession import pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.BigQueryStorage import pekko.stream.scaladsl.Source import com.google.cloud.bigquery.storage.v1.stream.ReadSession.TableReadOptions import scala.concurrent.Future val sourceOfSources: Source[(ReadSession.Schema, Seq[Source[ReadRowsResponse.Rows, NotUsed]]), Future[NotUsed]] = BigQueryStorage.create("projectId", "datasetId", "tableId", DataFormat.AVRO)
- Java
-
source
import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.BigQueryRecord; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.BigQueryStorageSettings; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.javadsl.BigQueryArrowStorage; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.javadsl.BigQueryAvroStorage; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.javadsl.BigQueryStorage; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.BigQueryStorageAttributes; import org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.GrpcBigQueryStorageReader; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.util.ByteString; import scala.Tuple2; import com.google.cloud.bigquery.storage.v1.DataFormat; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.storage.ReadRowsResponse; import org.apache.pekko.http.javadsl.unmarshalling.Unmarshaller; Source< Tuple2< com.google.cloud.bigquery.storage.v1.stream.ReadSession.Schema, List<Source<ReadRowsResponse.Rows, NotUsed>>>, CompletionStage<NotUsed>> sourceOfSources = BigQueryStorage.create("projectId", "datasetId", "tableId", DataFormat.AVRO);
Since Avro and Arrow are the formats available, streams for those specific formats can be created.
You can read Arrow Record streams merged
- Scala
-
source
val arrowSequentialSource: Source[Seq[BigQueryRecord], Future[NotUsed]] = BigQueryArrowStorage.readRecordsMerged("projectId", "datasetId", "tableId")
- Java
-
source
Source<List<BigQueryRecord>, CompletionStage<NotUsed>> arrowSequentialSource = BigQueryArrowStorage.readRecordsMerged("projectId", "datasetId", "tableId");
You can read Arrow Record streams individually
- Scala
-
source
val arrowParallelSource: Source[Seq[Source[BigQueryRecord, NotUsed]], Future[NotUsed]] = BigQueryArrowStorage.readRecords("projectId", "datasetId", "tableId")
- Java
-
source
Source<List<Source<BigQueryRecord, NotUsed>>, CompletionStage<NotUsed>> arrowParallelSource = BigQueryArrowStorage.readRecords("projectId", "datasetId", "tableId");
You can read Avro Record streams merged
- Scala
-
source
val avroSequentialSource: Source[Seq[BigQueryRecord], Future[NotUsed]] = BigQueryAvroStorage.readRecordsMerged("projectId", "datasetId", "tableId")
- Java
-
source
Source<List<BigQueryRecord>, CompletionStage<NotUsed>> avroSequentialSource = BigQueryAvroStorage.readRecordsMerged("projectId", "datasetId", "tableId");
You can read Avro Record streams individually
- Scala
-
source
val avroParallelSource: Source[Seq[Source[BigQueryRecord, NotUsed]], Future[NotUsed]] = BigQueryAvroStorage.readRecords("projectId", "datasetId", "tableId")
- Java
-
source
Source<List<Source<BigQueryRecord, NotUsed>>, CompletionStage<NotUsed>> avroParallelSource = BigQueryAvroStorage.readRecords("projectId", "datasetId", "tableId");
Running the test code
The tests use a BigQueryMockServer
that implements the server defined in the protobuf for the Storage API. It essentially provides a mock table on which to query. Tests can be started from sbt by running:
- sbt
-
> google-cloud-bigquery-storage/test