Packages

object GooglePubSub

Google Pub/Sub Pekko Stream operator factory.

Source
GooglePubSub.scala
Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. GooglePubSub
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. def acknowledge(parallelism: Int, distribution: AckDeadlineDistribution): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]

    Create a sink that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    Create a sink that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    parallelism

    controls how many acknowledgements can be in-flight at any given time

    distribution

    shared distribution for adaptive deadline tracking

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  5. def acknowledge(parallelism: Int, flowControl: FlowControl): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]

    Create a sink that acknowledges messages and releases FlowControl permits.

    Create a sink that acknowledges messages and releases FlowControl permits.

    parallelism

    controls how many acknowledgements can be in-flight at any given time

    flowControl

    shared flow control instance

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  6. def acknowledge(parallelism: Int): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]

    Create a sink that accepts consumed message acknowledgements.

    Create a sink that accepts consumed message acknowledgements.

    The materialized value completes on stream completion.

    parallelism

    controls how many acknowledgements can be in-flight at any given time

  7. def acknowledgeFlow(distribution: AckDeadlineDistribution): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]

    Create a flow that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    Create a flow that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    distribution

    shared distribution for adaptive deadline tracking

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  8. def acknowledgeFlow(flowControl: FlowControl): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]

    Create a flow that acknowledges messages and releases FlowControl permits.

    Create a flow that acknowledges messages and releases FlowControl permits.

    flowControl

    shared flow control instance

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  9. def acknowledgeFlow(): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]

    Create a flow that accepts consumed message acknowledgements.

  10. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  11. def autoExtendAckDeadlines(subscription: String, extensionInterval: FiniteDuration, distribution: AckDeadlineDistribution): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]

    Create a flow that automatically extends ack deadlines using an adaptive deadline based on observed processing times, matching Google's official client library behavior.

    Create a flow that automatically extends ack deadlines using an adaptive deadline based on observed processing times, matching Google's official client library behavior.

    Instead of a fixed deadline, the extension deadline is computed from the 99th percentile (configurable) of actual ack latencies. This means:

    • If messages typically process in 2s, deadlines stay short (~10s) for fast redelivery on failure
    • If messages take 45s, deadlines adapt upward to avoid unnecessary redelivery

    The same AckDeadlineDistribution must be passed to the acknowledge/nack operators so that processing latencies are recorded.

    Usage:

    val dist = AckDeadlineDistribution(initialDeadlineSeconds = 10)
    
    GooglePubSub.subscribe(request, 1.second, restartSettings)
      .via(GooglePubSub.autoExtendAckDeadlines(subscriptionFqrs, 3.seconds, dist))
      .mapAsync(4)(processMessage)
      .map(msg => AcknowledgeRequest(subscriptionFqrs, Seq(msg.ackId)))
      .to(GooglePubSub.acknowledge(parallelism = 1, dist))
    subscription

    the fully qualified subscription resource string

    extensionInterval

    how often to extend deadlines (should be less than the ack deadline)

    distribution

    shared distribution that tracks processing times and computes adaptive deadlines

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  12. def autoExtendAckDeadlines(subscription: String, extensionInterval: FiniteDuration, ackDeadlineSeconds: Int, maxAckExtensionPeriod: FiniteDuration): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]

    Create a flow that automatically extends ack deadlines for messages passing through it.

    Create a flow that automatically extends ack deadlines for messages passing through it.

    A background timer periodically sends ModifyAckDeadline RPCs for all tracked messages, preventing Pub/Sub from redelivering them while they are being processed downstream. Messages are tracked from the moment they enter the flow until the stream completes, fails, or maxAckExtensionPeriod elapses. Messages are NOT removed from tracking upon acknowledgment; extending the deadline of an already-acknowledged message is a safe server-side no-op. For tracking that stops on acknowledgment, use the adaptive variant with AckDeadlineDistribution.

    If the background lease-extension ticker fails (e.g. due to a network error in the ModifyAckDeadline RPC), the main stream will be aborted immediately with an org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.AckDeadlineExtensionException, even if the stream is idle.

    subscription

    the fully qualified subscription resource string

    extensionInterval

    how often to extend deadlines (should be less than the ack deadline)

    ackDeadlineSeconds

    the new deadline to set on each extension

    maxAckExtensionPeriod

    maximum duration to keep extending a message's deadline (default 60 minutes)

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  13. def autoExtendAckDeadlines(subscription: String, extensionInterval: FiniteDuration, ackDeadlineSeconds: Int): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]

    Create a flow that automatically extends ack deadlines for messages passing through it.

    Create a flow that automatically extends ack deadlines for messages passing through it.

    A background timer periodically sends ModifyAckDeadline RPCs for all tracked messages, preventing Pub/Sub from redelivering them while they are being processed downstream. Messages are tracked from the moment they enter the flow until the stream completes, fails, or maxAckExtensionPeriod elapses (default 60 minutes, matching Google's client library). Messages are NOT removed from tracking upon acknowledgment; extending the deadline of an already-acknowledged message is a safe server-side no-op. For tracking that stops on acknowledgment, use the adaptive variant with AckDeadlineDistribution.

    If the background lease-extension ticker fails (e.g. due to a network error in the ModifyAckDeadline RPC), the main stream will be aborted immediately with an org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.AckDeadlineExtensionException, even if the stream is idle.

    Usage:

    GooglePubSub.subscribe(request, 1.second)
      .via(GooglePubSub.autoExtendAckDeadlines(subscriptionFqrs, 8.seconds, 30))
      .mapAsync(4)(processMessage)
      .map(msg => AcknowledgeRequest(subscriptionFqrs, Seq(msg.ackId)))
      .to(GooglePubSub.acknowledge(parallelism = 1))
    subscription

    the fully qualified subscription resource string

    extensionInterval

    how often to extend deadlines (should be less than the ack deadline)

    ackDeadlineSeconds

    the new deadline to set on each extension

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  14. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
  15. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  16. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  17. def flowControlGate(flowControl: FlowControl): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]

    Create a flow that gates messages based on FlowControl, applying backpressure when the number of outstanding (unacknowledged) messages reaches the limit.

    Create a flow that gates messages based on FlowControl, applying backpressure when the number of outstanding (unacknowledged) messages reaches the limit.

    Pair with a FlowControl-aware acknowledge or nack operator to release permits.

    flowControl

    shared flow control instance

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  18. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @IntrinsicCandidate() @native()
  19. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @IntrinsicCandidate() @native()
  20. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  21. def modifyAckDeadline(parallelism: Int): Sink[ModifyAckDeadlineRequest, Future[Done]]

    Create a sink that accepts ack deadline modifications.

    Create a sink that accepts ack deadline modifications.

    This can be used to extend the acknowledgement deadline for messages that require longer processing times, preventing Pub/Sub from redelivering them.

    The materialized value completes on stream completion.

    parallelism

    controls how many modifications can be in-flight at any given time

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  22. def modifyAckDeadlineDynamic(subscription: String, parallelism: Int = 1)(deadlineFn: (com.google.pubsub.v1.pubsub.ReceivedMessage) => Int): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]

    Create a flow that modifies the ack deadline for each message using a dynamic function.

    Create a flow that modifies the ack deadline for each message using a dynamic function.

    This allows per-message deadline control — for example, setting longer deadlines for messages that are expected to take longer to process.

    Returning 0 from the function is equivalent to a nack (immediate redelivery).

    subscription

    the fully qualified subscription resource string

    parallelism

    controls how many modifications can be in-flight at any given time

    deadlineFn

    function that computes the ack deadline in seconds for each message

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  23. def modifyAckDeadlineFlow(): Flow[ModifyAckDeadlineRequest, ModifyAckDeadlineRequest, NotUsed]

    Create a flow that accepts ack deadline modifications.

    Create a flow that accepts ack deadline modifications.

    This can be used to extend the acknowledgement deadline for messages that require longer processing times, preventing Pub/Sub from redelivering them.

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  24. def nack(parallelism: Int, distribution: AckDeadlineDistribution): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]

    Create a sink that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    Create a sink that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    parallelism

    controls how many nacks can be in-flight at any given time

    distribution

    shared distribution for adaptive deadline tracking

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  25. def nack(parallelism: Int, flowControl: FlowControl): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]

    Create a sink that nacks messages and releases FlowControl permits.

    Create a sink that nacks messages and releases FlowControl permits.

    parallelism

    controls how many nacks can be in-flight at any given time

    flowControl

    shared flow control instance

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  26. def nack(parallelism: Int): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]

    Create a sink that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.

    Create a sink that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.

    The materialized value completes on stream completion.

    parallelism

    controls how many nacks can be in-flight at any given time

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  27. def nackFlow(distribution: AckDeadlineDistribution): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]

    Create a flow that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    Create a flow that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.

    distribution

    shared distribution for adaptive deadline tracking

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  28. def nackFlow(flowControl: FlowControl): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]

    Create a flow that nacks messages and releases FlowControl permits.

    Create a flow that nacks messages and releases FlowControl permits.

    flowControl

    shared flow control instance

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  29. def nackFlow(): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]

    Create a flow that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.

    Create a flow that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.

    Accepts AcknowledgeRequest for consistency with the acknowledge API — the subscription and ack IDs are reused to build the underlying ModifyAckDeadlineRequest with ackDeadlineSeconds = 0.

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  30. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  31. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @IntrinsicCandidate() @native()
  32. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @IntrinsicCandidate() @native()
  33. def publish(parallelism: Int): Flow[com.google.pubsub.v1.pubsub.PublishRequest, com.google.pubsub.v1.pubsub.PublishResponse, NotUsed]

    Create a flow to publish messages to Google Cloud Pub/Sub.

    Create a flow to publish messages to Google Cloud Pub/Sub. The flow emits responses that contain published message ids.

    parallelism

    controls how many messages can be in-flight at any given time

  34. def subscribe(request: StreamingPullRequest, pollInterval: FiniteDuration, restartSettings: RestartSettings): Source[com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]

    Create a source that emits messages for a given subscription using a StreamingPullRequest.

    Create a source that emits messages for a given subscription using a StreamingPullRequest.

    Automatically reconnects when the server closes the StreamingPull connection (due to idle timeout, rebalancing, etc.), matching the behavior of Google's official client library. All errors trigger reconnection with exponential backoff, including non-retryable errors such as PERMISSION_DENIED or NOT_FOUND. Use RestartSettings.withMaxRestarts to bound the total number of restart attempts; when exhausted the stream fails with the last error.

    request

    the subscription FQRS and ack deadline fields are mandatory for the request

    pollInterval

    time between StreamingPullRequest messages are being sent

    restartSettings

    settings for the exponential backoff reconnection behavior

    Annotations
    @ApiMayChange()
    Since

    2.0.0

  35. def subscribe(request: StreamingPullRequest, pollInterval: FiniteDuration): Source[com.google.pubsub.v1.pubsub.ReceivedMessage, Future[Cancellable]]

    Create a source that emits messages for a given subscription using a StreamingPullRequest.

    Create a source that emits messages for a given subscription using a StreamingPullRequest.

    The materialized value can be used to cancel the source.

    request

    the subscription FQRS and ack deadline fields are mandatory for the request

    pollInterval

    time between StreamingPullRequest messages are being sent

  36. def subscribePolling(request: com.google.pubsub.v1.pubsub.PullRequest, pollInterval: FiniteDuration): Source[com.google.pubsub.v1.pubsub.ReceivedMessage, Future[Cancellable]]

    Create a source that emits messages for a given subscription using a synchronous PullRequest.

    Create a source that emits messages for a given subscription using a synchronous PullRequest.

    The materialized value can be used to cancel the source.

    request

    the subscription FQRS field is mandatory for the request

    pollInterval

    time between PullRequest messages are being sent

  37. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  38. def toString(): String
    Definition Classes
    AnyRef → Any
  39. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  40. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()
  41. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable]) @Deprecated
    Deprecated

    (Since version 9)

Inherited from AnyRef

Inherited from Any

Ungrouped