Google Cloud BigQuery
The BigQuery connector provides Apache Pekko Stream sources and sinks to connect to Google Cloud BigQuery. BigQuery is a serverless data warehouse for storing and analyzing massive datasets. This connector is primarily intended for streaming data into and out of BigQuery tables and running SQL queries, although it also provides basic support for managing datasets and tables and flexible access to the BigQuery REST API.
Project Info: Apache Pekko Connectors Google Cloud BigQuery | |
---|---|
Artifact | org.apache.pekko
pekko-connectors-google-cloud-bigquery
1.1.0-M1+129-4a175fef-SNAPSHOT
|
JDK versions | OpenJDK 8 OpenJDK 11 OpenJDK 17 OpenJDK 21 |
Scala versions | 2.13.15, 2.12.20, 3.3.4 |
JPMS module name | pekko.stream.connectors.google.cloud.bigquery |
License | |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/apache/pekko-connectors |
Apache Pekko Connectors Google Cloud BigQuery is marked as “API may change”. Please try it out and suggest improvements. PR #2548
Artifacts
- sbt
val PekkoVersion = "1.1.2" val PekkoHttpVersion = "1.1.0" libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-google-cloud-bigquery" % "1.1.0-M1+129-4a175fef-SNAPSHOT", "org.apache.pekko" %% "pekko-stream" % PekkoVersion, "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion )
- Maven
<properties> <pekko.version>1.1.2</pekko.version> <pekko.http.version>1.1.0</pekko.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-connectors-google-cloud-bigquery_${scala.binary.version}</artifactId> <version>1.1.0-M1+129-4a175fef-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http-spray-json_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoVersion: "1.1.2", PekkoHttpVersion: "1.1.0", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-connectors-google-cloud-bigquery_${versions.ScalaBinary}:1.1.0-M1+129-4a175fef-SNAPSHOT" implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}" implementation "org.apache.pekko:pekko-http_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" implementation "org.apache.pekko:pekko-http-spray-json_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" }
To use the Jackson JSON library for marshalling you must also add the Apache Pekko HTTP module for Jackson support.
- sbt
val PekkoHttpVersion = "1.1.0" libraryDependencies += "org.apache.pekko" %% "pekko-http-jackson" % PekkoHttpVersion
- Maven
<properties> <pekko.http.version>1.1.0</pekko.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-http-jackson_${scala.binary.version}</artifactId> <version>${pekko.http.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ PekkoHttpVersion: "1.1.0", ScalaBinary: "2.13" ] dependencies { implementation "org.apache.pekko:pekko-http-jackson_${versions.ScalaBinary}:${versions.PekkoHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries that it depends on transitively.
Configuration
The BigQuery connector shares its basic configuration with all the Google connectors in Apache Pekko Connectors. Additional BigQuery-specific configuration settings can be found in its reference.conf.
Imports
All of the examples below assume the following imports are in scope.
- Scala
-
source
import org.apache.pekko import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings } import pekko.stream.connectors.googlecloud.bigquery.InsertAllRetryPolicy import pekko.stream.connectors.googlecloud.bigquery.model.{ Dataset, Job, JobReference, JobState, QueryResponse, Table, TableDataListResponse, TableListResponse } import pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema.BigQuerySchemas._ import pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema.TableSchemaWriter import pekko.stream.connectors.googlecloud.bigquery.scaladsl.spray.BigQueryRootJsonFormat import pekko.stream.connectors.googlecloud.bigquery.scaladsl.spray.BigQueryJsonProtocol._ import pekko.stream.connectors.googlecloud.bigquery.scaladsl.BigQuery import pekko.stream.scaladsl.{ Flow, Sink, Source } import pekko.{ Done, NotUsed } import scala.annotation.nowarn import scala.collection.immutable.Seq import scala.concurrent.Future
- Java
-
source
import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.http.javadsl.marshallers.jackson.Jackson; import org.apache.pekko.http.javadsl.marshalling.Marshaller; import org.apache.pekko.http.javadsl.model.HttpEntity; import org.apache.pekko.http.javadsl.model.RequestEntity; import org.apache.pekko.http.javadsl.unmarshalling.Unmarshaller; import org.apache.pekko.stream.connectors.google.GoogleAttributes; import org.apache.pekko.stream.connectors.google.GoogleSettings; import org.apache.pekko.stream.connectors.googlecloud.bigquery.InsertAllRetryPolicy; import org.apache.pekko.stream.connectors.googlecloud.bigquery.javadsl.BigQuery; import org.apache.pekko.stream.connectors.googlecloud.bigquery.javadsl.jackson.BigQueryMarshallers; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.Dataset; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.Job; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.JobReference; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.JobState; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.QueryResponse; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.Table; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableDataInsertAllRequest; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableDataListResponse; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableFieldSchema; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableFieldSchemaMode; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableFieldSchemaType; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableListResponse; import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableSchema; import org.apache.pekko.stream.javadsl.Flow; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors;
Setup data classes
As a working example throughout this documentation, we will use the Person
case class to model the data in our BigQuery tables.
- Scala
-
source
case class Person(name: String, age: Int, addresses: Seq[Address], isHakker: Boolean) case class Address(street: String, city: String, postalCode: Option[Int]) implicit val addressFormat: BigQueryRootJsonFormat[Address] = bigQueryJsonFormat3(Address.apply) implicit val personFormat: BigQueryRootJsonFormat[Person] = bigQueryJsonFormat4(Person.apply)
- Java
-
source
ObjectMapper objectMapper = new ObjectMapper(); public class Person { private String name; private Integer age; private List<Address> addresses; private Boolean isHakker; @JsonCreator public Person(@JsonProperty("f") JsonNode fields) throws IOException { name = fields.get(0).get("v").textValue(); age = Integer.parseInt(fields.get(1).get("v").textValue()); addresses = new ArrayList<>(); ObjectReader addressReader = objectMapper.readerFor(Address.class); for (JsonNode node : fields.get(2).get("v")) { Address address = addressReader.readValue(node.get("v")); addresses.add(address); } isHakker = Boolean.parseBoolean(fields.get(3).get("v").textValue()); } public String getName() { return name; } public Integer getAge() { return age; } public List<Address> getAddresses() { return addresses; } public Boolean getIsHakker() { return isHakker; } } public class Address { private String street; private String city; private Integer postalCode; @JsonCreator public Address(@JsonProperty("f") JsonNode fields) { street = fields.get(0).get("v").textValue(); city = fields.get(1).get("v").textValue(); postalCode = Optional.of(fields.get(2).get("v").textValue()).map(Integer::parseInt).orElse(null); } public String getStreet() { return street; } public String getCity() { return city; } public Integer getPostalCode() { return postalCode; } } public class NameAddressesPair { private String name; private List<Address> addresses; @JsonCreator public NameAddressesPair(@JsonProperty("f") JsonNode fields) throws IOException { name = fields.get(0).get("v").textValue(); addresses = new ArrayList<>(); ObjectReader addressReader = objectMapper.readerFor(Address.class); for (JsonNode node : fields.get(1).get("v")) { Address address = addressReader.readValue(node.get("v")); addresses.add(address); } } }
To enable automatic support for (un)marshalling User
and Address
as BigQuery table rows and query results we create implicit BigQueryRootJsonFormat[T]
instances. The bigQueryJsonFormatN
methods are imported from BigQueryJsonProtocol
, analogous to Spray’s DefaultJsonProtocol
. To enable support for (un)marshalling User
and Address
as BigQuery table rows and query results we use Jackson’s @JsonCreator
and @JsonProperty
annotations. Note that a custom @JsonCreator
constructor is necessary due to BigQuery’s unusual encoding of rows as “a series of JSON f,v objects for indicating fields and values” (reference documentation). In addition, we also define NameAddressesPair
to model the result of the query in the next section.
Run a query
You can run a SQL query and stream the unmarshalled results with the BigQuery.query<Out>
BigQuery.query[Out]
BigQuery.<Out>query
BigQuery.<Out>query
method. The output type Out
can be a tuple or any user-defined class for which an implicit BigQueryRootJsonFormat[Out]
is available. Note that the order and presence of fields in Out
must strictly match your SQL query. To create the unmarshaller, use the BigQueryMarshallers.<Out>queryResponseUnmarshaller
method.
- Scala
-
source
val sqlQuery = s"SELECT name, addresses FROM $datasetId.$tableId WHERE age >= 100" val centenarians: Source[(String, Seq[Address]), Future[QueryResponse[(String, Seq[Address])]]] = BigQuery.query[(String, Seq[Address])](sqlQuery, useLegacySql = false)
- Java
-
source
String sqlQuery = String.format("SELECT name, addresses FROM %s.%s WHERE age >= 100", datasetId, tableId); Unmarshaller<HttpEntity, QueryResponse<NameAddressesPair>> queryResponseUnmarshaller = BigQueryMarshallers.queryResponseUnmarshaller(NameAddressesPair.class); Source<NameAddressesPair, CompletionStage<QueryResponse<NameAddressesPair>>> centenarians = BigQuery.query(sqlQuery, false, false, queryResponseUnmarshaller);
Notice that the source materializes a Future[QueryResponse[(String, Seq[Address])]]
CompletionStage<QueryResponse<NameAddressesTuple>>
which can be used to retrieve metadata related to the query. For example, you can use a dry run to estimate the number of bytes that will be read by a query.
- Scala
-
source
val centenariansDryRun = BigQuery.query[(String, Seq[Address])](sqlQuery, dryRun = true, useLegacySql = false) val bytesProcessed: Future[Long] = centenariansDryRun.to(Sink.ignore).run().map(_.totalBytesProcessed.get)
- Java
-
source
Source<NameAddressesPair, CompletionStage<QueryResponse<NameAddressesPair>>> centenariansDryRun = BigQuery.query(sqlQuery, false, false, queryResponseUnmarshaller); CompletionStage<Long> bytesProcessed = centenariansDryRun .to(Sink.ignore()) .run(system) .thenApply(r -> r.getTotalBytesProcessed().getAsLong());
Finally, you can also stream all of the rows in a table without the expense of running a query with the BigQuery.tableData<Out>
BigQuery.tableData[Out]
BigQuery.<Out>listTableData
BigQuery.<Out>listTableData
method.
- Scala
-
source
val everyone: Source[Person, Future[TableDataListResponse[Person]]] = BigQuery.tableData[Person](datasetId, tableId)
- Java
-
source
Unmarshaller<HttpEntity, TableDataListResponse<Person>> tableDataListUnmarshaller = BigQueryMarshallers.tableDataListResponseUnmarshaller(Person.class); Source<Person, CompletionStage<TableDataListResponse<Person>>> everyone = BigQuery.listTableData( datasetId, tableId, OptionalLong.empty(), OptionalInt.empty(), Collections.emptyList(), tableDataListUnmarshaller);
Load data into BigQuery
The BigQuery connector enables loading data into tables via real-time streaming inserts or batch loading. For an overview of these strategies see the BigQuery documentation.
The BigQuery.insertAll<In>
BigQuery.insertAll[In]
BigQuery.<In>insertAll
BigQuery.<In>insertAll
method creates a sink that accepts batches of Seq[In]
List<In>
(for example created via the batch
operator) and streams them directly into a table. To enable/disable BigQuery’s best-effort deduplication feature use the appropriate InsertAllRetryPolicy
InsertAllRetryPolicy
.
- Scala
-
source
val peopleInsertSink: Sink[Seq[Person], NotUsed] = BigQuery.insertAll[Person](datasetId, tableId, InsertAllRetryPolicy.WithDeduplication)
- Java
-
source
Marshaller<TableDataInsertAllRequest<Person>, RequestEntity> tableDataInsertAllMarshaller = BigQueryMarshallers.tableDataInsertAllRequestMarshaller(); Sink<List<Person>, NotUsed> peopleInsertSink = BigQuery.insertAll( datasetId, tableId, InsertAllRetryPolicy.withDeduplication(), Optional.empty(), tableDataInsertAllMarshaller);
As a cost-saving alternative to streaming inserts, you can also add data to a table via asynchronous load jobs. The BigQuery.insertAllAsync<In>
BigQuery.insertAllAsync[In]
BigQuery.<In>insertAllAsync
BigQuery.<In>insertAllAsync
method creates a flow that starts a series of batch load jobs. By default, a new load job is created every minute to attempt to emulate near-real-time streaming inserts, although there is no guarantee when the job will actually run. The frequency with which new load jobs are created is controlled by the pekko.connectors.google.bigquery.load-job-per-table-quota
configuration setting.
Pending the resolution of Google BigQuery issue 176002651, the BigQuery.insertAllAsync
API may not work as expected.
As a workaround, you can use the config setting pekko.http.parsing.conflicting-content-type-header-processing-mode = first
with Apache Pekko HTTP v1.0.0 or later.
- Scala
-
source
val peopleLoadFlow: Flow[Person, Job, NotUsed] = BigQuery.insertAllAsync[Person](datasetId, tableId)
- Java
-
source
Flow<Person, Job, NotUsed> peopleLoadFlow = BigQuery.insertAllAsync(datasetId, tableId, Jackson.marshaller());
To check the status of the load jobs use the BigQuery.job
BigQuery.job
BigQuery.getJob
BigQuery.getJob
method.
- Scala
-
source
def checkIfJobsDone(jobReferences: Seq[JobReference]): Future[Boolean] = { for { jobs <- Future.sequence(jobReferences.map(ref => BigQuery.job(ref.jobId.get))) } yield jobs.forall(job => job.status.exists(_.state == JobState.Done)) } val isDone: Future[Boolean] = for { jobs <- Source(people).via(peopleLoadFlow).runWith(Sink.seq) jobReferences = jobs.flatMap(job => job.jobReference) isDone <- checkIfJobsDone(jobReferences) } yield isDone
- Java
-
source
Function<List<JobReference>, CompletionStage<Boolean>> checkIfJobsDone = jobReferences -> { GoogleSettings settings = GoogleSettings.create(system); CompletionStage<Boolean> allAreDone = CompletableFuture.completedFuture(true); for (JobReference jobReference : jobReferences) { CompletionStage<Job> job = BigQuery.getJob(jobReference.getJobId().get(), Optional.empty(), settings, system); CompletionStage<Boolean> jobIsDone = job.thenApply( j -> j.getStatus().map(s -> s.getState().equals(JobState.done())).orElse(false)); allAreDone = allAreDone.thenCombine(jobIsDone, (a, b) -> a & b); } return allAreDone; }; CompletionStage<List<Job>> jobs = Source.from(people).via(peopleLoadFlow).runWith(Sink.<Job>seq(), system); CompletionStage<List<JobReference>> jobReferences = jobs.thenApply( js -> js.stream().map(j -> j.getJobReference().get()).collect(Collectors.toList())); CompletionStage<Boolean> isDone = jobReferences.thenCompose(checkIfJobsDone);
Managing datasets and tables
The BigQuery connector provides methods for basic management of datasets and tables.
- Scala
-
source
val allDatasets: Source[Dataset, NotUsed] = BigQuery.datasets val existingDataset: Future[Dataset] = BigQuery.dataset(datasetId) val newDataset: Future[Dataset] = BigQuery.createDataset("newDatasetId") val datasetDeleted: Future[Done] = BigQuery.deleteDataset(datasetId) val allTablesInDataset: Source[Table, Future[TableListResponse]] = BigQuery.tables(datasetId) val existingTable: Future[Table] = BigQuery.table(datasetId, tableId) val tableDeleted: Future[Done] = BigQuery.deleteTable(datasetId, tableId)
- Java
-
source
GoogleSettings settings = GoogleSettings.create(system); Source<Dataset, NotUsed> allDatasets = BigQuery.listDatasets(OptionalInt.empty(), Optional.empty(), Collections.emptyMap()); CompletionStage<Dataset> existingDataset = BigQuery.getDataset(datasetId, settings, system); CompletionStage<Dataset> newDataset = BigQuery.createDataset("newDatasetId", settings, system); CompletionStage<Done> datasetDeleted = BigQuery.deleteDataset(datasetId, false, settings, system); Source<Table, CompletionStage<TableListResponse>> allTablesInDataset = BigQuery.listTables(datasetId, OptionalInt.empty()); CompletionStage<Table> existingTable = BigQuery.getTable(datasetId, tableId, settings, system); CompletionStage<Done> tableDeleted = BigQuery.deleteTable(datasetId, tableId, settings, system);
Creating a table requires a little more work to specify the schema. To enable automatic schema generation, you can bring implicit TableSchemaWriter[T]
instances for your classes into scope via the bigQuerySchemaN
methods in BigQuerySchemas
.
- Scala
-
source
implicit val addressSchema: TableSchemaWriter[Address] = bigQuerySchema3(Address.apply) implicit val personSchema: TableSchemaWriter[Person] = bigQuerySchema4(Person.apply) val newTable: Future[Table] = BigQuery.createTable[Person](datasetId, "newTableId")
- Java
-
source
TableSchema personSchema = TableSchema.create( TableFieldSchema.create("name", TableFieldSchemaType.string(), Optional.empty()), TableFieldSchema.create("age", TableFieldSchemaType.integer(), Optional.empty()), TableFieldSchema.create( "addresses", TableFieldSchemaType.record(), Optional.of(TableFieldSchemaMode.repeated()), TableFieldSchema.create("street", TableFieldSchemaType.string(), Optional.empty()), TableFieldSchema.create("city", TableFieldSchemaType.string(), Optional.empty()), TableFieldSchema.create( "postalCode", TableFieldSchemaType.integer(), Optional.of(TableFieldSchemaMode.nullable()))), TableFieldSchema.create("isHakker", TableFieldSchemaType.bool(), Optional.empty())); CompletionStage<Table> newTable = BigQuery.createTable(datasetId, "newTableId", personSchema, settings, system);
Apply custom settings to a part of the stream
In certain situations it may be desirable to modify the GoogleSettings
GoogleSettings
applied to a part of the stream, for example to change the project ID or use different RetrySettings
RetrySettings
.
- Scala
-
source
val defaultSettings: GoogleSettings = GoogleSettings() val customSettings = defaultSettings.copy(projectId = "myOtherProject") BigQuery.query[(String, Seq[Address])](sqlQuery).withAttributes(GoogleAttributes.settings(customSettings))
- Java
-
source
GoogleSettings defaultSettings = GoogleSettings.create(system); GoogleSettings customSettings = defaultSettings.withProjectId("myOtherProjectId"); BigQuery.query(sqlQuery, false, false, queryResponseUnmarshaller) .withAttributes(GoogleAttributes.settings(customSettings));
Make raw API requests
If you would like to interact with the BigQuery REST API beyond what the BigQuery connector supports, you can make authenticated raw requests via the BigQuery.singleRequest
BigQuery.singleRequest
and BigQuery.paginatedRequest<Out>
BigQuery.paginatedRequest[Out]
BigQuery.<Out>paginatedRequest
BigQuery.<Out>paginatedRequest
methods.