Google Common

The google-common module provides central configuration for Google connectors in Apache Pekko Connectors as well as basic support for interfacing with Google APIs.

Artifacts

sbt
val PekkoVersion = "1.0.3"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-connectors-google-common" % "1.0.2",
  "org.apache.pekko" %% "pekko-stream" % PekkoVersion
)
Maven
<properties>
  <pekko.version>1.0.3</pekko.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-connectors-google-common_${scala.binary.version}</artifactId>
    <version>1.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-stream_${scala.binary.version}</artifactId>
    <version>${pekko.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  PekkoVersion: "1.0.3",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "org.apache.pekko:pekko-connectors-google-common_${versions.ScalaBinary}:1.0.2"
  implementation "org.apache.pekko:pekko-stream_${versions.ScalaBinary}:${versions.PekkoVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Configuration

Shared settings for all Google connectors are read by default from the pekko.connectors.google configuration section in your application.conf. The available options and their default values are documented in the reference.conf. If you use a non-standard configuration path or need multiple different configurations please refer to the sections below.

source# SPDX-License-Identifier: Apache-2.0

pekko.connectors.google {

  credentials {
    # Used to build a list of default scopes from each of the modules, don't override
    default-scopes = ${?pekko.connectors.google.credentials.default-scopes} []
    # Override to specify custom scopes
    scopes = ${pekko.connectors.google.credentials.default-scopes}

    # Options: application-default, service-account, compute-engine, none
    # application-default first tries service-account then compute-engine
    provider = application-default

    service-account {

      # Config values have first priority, otherwise look for credentials file
      project-id = ""
      client-email = ""
      private-key = ""

      # Resolves a path to the well-known credentials file
      # See https://github.com/googleapis/google-auth-library-java/blob/master/oauth2_http/java/com/google/auth/oauth2/DefaultCredentialsProvider.java#L237
      path = ${user.home}/.config
      path = ${?APPDATA} # Windows-only
      path = ${pekko.connectors.google.credentials.service-account.path}/gcloud
      path = ${?CLOUDSDK_CONFIG}
      path = ${pekko.connectors.google.credentials.service-account.path}/application_default_credentials.json
      path = ${?GOOGLE_APPLICATION_CREDENTIALS}

      # Always required regardless of where credentials are read from
      scopes = ${pekko.connectors.google.credentials.scopes}
    }

    # Timeout for blocking call during settings initialization to compute engine metadata server
    compute-engine.timeout = 1s

    user-access {
      project-id = ""
      client-id = ""
      client-secret = ""
      refresh-token = ""

      # Resolves a path to the well-known credentials file
      # See https://github.com/googleapis/google-auth-library-java/blob/master/oauth2_http/java/com/google/auth/oauth2/DefaultCredentialsProvider.java#L237
      path = ${user.home}/.config
      path = ${?APPDATA} # Windows-only
      path = ${pekko.connectors.google.credentials.user-access.path}/gcloud
      path = ${?CLOUDSDK_CONFIG}
      path = ${pekko.connectors.google.credentials.user-access.path}/application_default_credentials.json
      path = ${?GOOGLE_APPLICATION_CREDENTIALS}
    }

    none {
      project-id = "<no-project-id>"
      token = "<no-token>"
    }
  }

  # Standard query parameters for all Google APIs sent with every request
  user-ip = ""
  quota-user = ""
  pretty-print = false

  # The minimum size of a chunk, must be a multiple of 256 KiB. See https://github.com/googleapis/java-core/issues/86
  upload-chunk-size = 15 MiB

  # The retry settings for requests to Google APIs
  # Defaults from https://github.com/googleapis/python-api-core/blob/master/google/api_core/retry.py#L72
  retry-settings {
    max-retries = 6
    min-backoff = 1 second
    max-backoff = 1 minute
    random-factor = 0.2
  }

  # An address of a proxy that will be used for all connections using HTTP CONNECT tunnel.
  # forward-proxy {
  #   scheme = "https"
  #   host = "proxy"
  #   port = 8080
  #   credentials {
  #     username = "username"
  #     password = "password"
  #   }
  #   trust-pem = "/path/to/file.pem"
  # }

}

Credentials

Credentials will be loaded automatically:

  1. From the file path specified by the GOOGLE_APPLICATION_CREDENTIALS environment variable or another “well-known” location; or
  2. When running in a Compute Engine instance.

Credentials can also be specified manually in your configuration file.

Accessing settings

GoogleSettingsGoogleSettings provides methods to retrieve settings from your configuration and GoogleAttributesGoogleAttributes to access the settings attached to a stream. Additionally, if there is an implicit ActorSystemActorSystem in scope, then so will be an implicit instance of the default GoogleSettingsGoogleSettings.

Scala
sourceval defaultSettings = GoogleSettings()
val customSettings = GoogleSettings("my-app.custom-google-config")
Source.fromMaterializer { (mat, attr) =>
  val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr)
  Source.empty
}
Java
sourceGoogleSettings defaultSettings = GoogleSettings.create(system);
GoogleSettings customSettings = GoogleSettings.create("my-app.custom-google-config", system);
Source.fromMaterializer(
    (mat, attr) -> {
      GoogleSettings settings = GoogleAttributes.resolveSettings(mat, attr);
      return Source.empty();
    });

Apply custom settings to a part of the stream

In certain situations it may be desirable to modify the GoogleSettingsGoogleSettings applied to a part of the stream, for example to use different credentials or change the RetrySettingsRetrySettings. This is accomplished by adding GoogleAttributesGoogleAttributes to your stream.

Scala
sourcestream.addAttributes(GoogleAttributes.settingsPath("my-app.custom-google-config"))

val defaultSettings = GoogleSettings()
val customSettings = defaultSettings.withProjectId("my-other-project")
stream.addAttributes(GoogleAttributes.settings(customSettings))
Java
sourcestream.addAttributes(GoogleAttributes.settingsPath("my-app.custom-google-config"));

GoogleSettings defaultSettings = GoogleSettings.create(system);
GoogleSettings customSettings = defaultSettings.withProjectId("my-other-project");
stream.addAttributes(GoogleAttributes.settings(customSettings));

Interop with Google Java client libraries

Instances of the CredentialsCredentials class can be converted via the toGoogle() method to Credentials compatible with Google Java client libraries.

Accessing other Google APIs

The GoogleGoogle object class provides methods for interfacing with Google APIs. You can use it to access APIs that are not currently supported by Apache Pekko Connectors and build new connectors.