Google Common

The google-common module provides central configuration for Google connectors in Apache Pekko Connectors as well as basic support for interfacing with Google APIs.

Artifacts

sbt
Maven
Gradle
val PekkoVersion = "1.1.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-google-common" % "1.1.0",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
<properties>
  <pekko.version>1.1.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-google-common_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
def versions = [
  PekkoVersion: "1.1.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-google-common_${versions.ScalaBinary}:1.1.0"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Configuration

Shared settings for all Google connectors are read by default from the pekko.connectors.google configuration section in your application.conf. The available options and their default values are documented in the reference.conf. If you use a non-standard configuration path or need multiple different configurations please refer to the sections below.

source# SPDX-License-Identifier: Apache-2.0

pekko.connectors.google {

  credentials {
    # Used to build a list of default scopes from each of the modules, don't override
    default-scopes = ${?pekko.connectors.google.credentials.default-scopes} []
    # Override to specify custom scopes
    scopes = ${pekko.connectors.google.credentials.default-scopes}

    # Options: application-default, service-account, compute-engine, none
    # application-default first tries service-account then compute-engine
    provider = application-default

    service-account {

      # Config values have first priority, otherwise look for credentials file
      project-id = ""
      client-email = ""
      private-key = ""

      # Resolves a path to the well-known credentials file
      # See https://github.com/googleapis/google-auth-library-java/blob/master/oauth2_http/java/com/google/auth/oauth2/DefaultCredentialsProvider.java#L237
      path = ${user.home}/.config
      path = ${?APPDATA} # Windows-only
      path = ${pekko.connectors.google.credentials.service-account.path}/gcloud
      path = ${?CLOUDSDK_CONFIG}
      path = ${pekko.connectors.google.credentials.service-account.path}/application_default_credentials.json
      path = ${?GOOGLE_APPLICATION_CREDENTIALS}
    }

    # Timeout for blocking call during settings initialization to compute engine metadata server
    compute-engine.timeout = 1s

    user-access {
      project-id = ""
      client-id = ""
      client-secret = ""
      refresh-token = ""

      # Resolves a path to the well-known credentials file
      # See https://github.com/googleapis/google-auth-library-java/blob/master/oauth2_http/java/com/google/auth/oauth2/DefaultCredentialsProvider.java#L237
      path = ${user.home}/.config
      path = ${?APPDATA} # Windows-only
      path = ${pekko.connectors.google.credentials.user-access.path}/gcloud
      path = ${?CLOUDSDK_CONFIG}
      path = ${pekko.connectors.google.credentials.user-access.path}/application_default_credentials.json
      path = ${?GOOGLE_APPLICATION_CREDENTIALS}
    }

    none {
      project-id = "<no-project-id>"
      token = "<no-token>"
    }
  }

  # Standard query parameters for all Google APIs sent with every request
  user-ip = ""
  quota-user = ""
  pretty-print = false

  # The minimum size of a chunk, must be a multiple of 256 KiB. See https://github.com/googleapis/java-core/issues/86
  upload-chunk-size = 15 MiB

  # The retry settings for requests to Google APIs
  # Defaults from https://github.com/googleapis/python-api-core/blob/master/google/api_core/retry.py#L72
  retry-settings {
    max-retries = 6
    min-backoff = 1 second
    max-backoff = 1 minute
    random-factor = 0.2
  }

  # An address of a proxy that will be used for all connections using HTTP CONNECT tunnel.
  # forward-proxy {
  #   scheme = "https"
  #   host = "proxy"
  #   port = 8080
  #   credentials {
  #     username = "username"
  #     password = "password"
  #   }
  #   trust-pem = "/path/to/file.pem"
  # }

}

Credentials

Credentials will be loaded automatically:

  1. From the file path specified by the GOOGLE_APPLICATION_CREDENTIALS environment variable or another “well-known” location; or
  2. When running in a Compute Engine instance.

Credentials can also be specified manually in your configuration file.

Accessing settings

GoogleSettings provides methods to retrieve settings from your configuration and GoogleAttributes to access the settings attached to a stream.

Scala
Java
sourceval defaultSettings = GoogleSettings()
val customSettings = GoogleSettings("my-app.custom-google-config")
Source.fromMaterializer { (mat, attr) =>
  val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr)
  Source.empty
}
sourceGoogleSettings defaultSettings = GoogleSettings.create(system);
GoogleSettings customSettings = GoogleSettings.create("my-app.custom-google-config", system);
Source.fromMaterializer(
    (mat, attr) -> {
      GoogleSettings settings = GoogleAttributes.resolveSettings(mat, attr);
      return Source.empty();
    });

Apply custom settings to a part of the stream

In certain situations it may be desirable to modify the GoogleSettings applied to a part of the stream, for example to use different credentials or change the RetrySettings. This is accomplished by adding GoogleAttributes to your stream.

Scala
Java
sourcestream.addAttributes(GoogleAttributes.settingsPath("my-app.custom-google-config"))

val defaultSettings = GoogleSettings()
val customSettings = defaultSettings.withProjectId("my-other-project")
stream.addAttributes(GoogleAttributes.settings(customSettings))
sourcestream.addAttributes(GoogleAttributes.settingsPath("my-app.custom-google-config"));

GoogleSettings defaultSettings = GoogleSettings.create(system);
GoogleSettings customSettings = defaultSettings.withProjectId("my-other-project");
stream.addAttributes(GoogleAttributes.settings(customSettings));

Interop with Google Java client libraries

Instances of the Credentials class can be converted via the toGoogle() method to Credentials compatible with Google Java client libraries.

Accessing other Google APIs

The Google class provides methods for interfacing with Google APIs. You can use it to access APIs that are not currently supported by Apache Pekko Connectors and build new connectors.