object GooglePubSub
- Alphabetic
- By Inheritance
- GooglePubSub
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def acknowledge(parallelism: Int, distribution: AckDeadlineDistribution): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]
Create a sink that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
Create a sink that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
- parallelism
controls how many acknowledgements can be in-flight at any given time
- distribution
shared distribution for adaptive deadline tracking
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def acknowledge(parallelism: Int, flowControl: FlowControl): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]
Create a sink that acknowledges messages and releases FlowControl permits.
Create a sink that acknowledges messages and releases FlowControl permits.
- parallelism
controls how many acknowledgements can be in-flight at any given time
- flowControl
shared flow control instance
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def acknowledge(parallelism: Int): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]
Create a sink that accepts consumed message acknowledgements.
Create a sink that accepts consumed message acknowledgements.
The materialized value completes on stream completion.
- parallelism
controls how many acknowledgements can be in-flight at any given time
- def acknowledgeFlow(distribution: AckDeadlineDistribution): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]
Create a flow that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
Create a flow that acknowledges messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
- distribution
shared distribution for adaptive deadline tracking
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def acknowledgeFlow(flowControl: FlowControl): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]
Create a flow that acknowledges messages and releases FlowControl permits.
Create a flow that acknowledges messages and releases FlowControl permits.
- flowControl
shared flow control instance
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def acknowledgeFlow(): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]
Create a flow that accepts consumed message acknowledgements.
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def autoExtendAckDeadlines(subscription: String, extensionInterval: FiniteDuration, distribution: AckDeadlineDistribution): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]
Create a flow that automatically extends ack deadlines using an adaptive deadline based on observed processing times, matching Google's official client library behavior.
Create a flow that automatically extends ack deadlines using an adaptive deadline based on observed processing times, matching Google's official client library behavior.
Instead of a fixed deadline, the extension deadline is computed from the 99th percentile (configurable) of actual ack latencies. This means:
- If messages typically process in 2s, deadlines stay short (~10s) for fast redelivery on failure
- If messages take 45s, deadlines adapt upward to avoid unnecessary redelivery
The same AckDeadlineDistribution must be passed to the acknowledge/nack operators so that processing latencies are recorded.
Usage:
val dist = AckDeadlineDistribution(initialDeadlineSeconds = 10) GooglePubSub.subscribe(request, 1.second, restartSettings) .via(GooglePubSub.autoExtendAckDeadlines(subscriptionFqrs, 3.seconds, dist)) .mapAsync(4)(processMessage) .map(msg => AcknowledgeRequest(subscriptionFqrs, Seq(msg.ackId))) .to(GooglePubSub.acknowledge(parallelism = 1, dist))
- subscription
the fully qualified subscription resource string
- extensionInterval
how often to extend deadlines (should be less than the ack deadline)
- distribution
shared distribution that tracks processing times and computes adaptive deadlines
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def autoExtendAckDeadlines(subscription: String, extensionInterval: FiniteDuration, ackDeadlineSeconds: Int, maxAckExtensionPeriod: FiniteDuration): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]
Create a flow that automatically extends ack deadlines for messages passing through it.
Create a flow that automatically extends ack deadlines for messages passing through it.
A background timer periodically sends
ModifyAckDeadlineRPCs for all tracked messages, preventing Pub/Sub from redelivering them while they are being processed downstream. Messages are tracked from the moment they enter the flow until the stream completes, fails, ormaxAckExtensionPeriodelapses. Messages are NOT removed from tracking upon acknowledgment; extending the deadline of an already-acknowledged message is a safe server-side no-op. For tracking that stops on acknowledgment, use the adaptive variant with AckDeadlineDistribution.If the background lease-extension ticker fails (e.g. due to a network error in the
ModifyAckDeadlineRPC), the main stream will be aborted immediately with an org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.AckDeadlineExtensionException, even if the stream is idle.- subscription
the fully qualified subscription resource string
- extensionInterval
how often to extend deadlines (should be less than the ack deadline)
- ackDeadlineSeconds
the new deadline to set on each extension
- maxAckExtensionPeriod
maximum duration to keep extending a message's deadline (default 60 minutes)
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def autoExtendAckDeadlines(subscription: String, extensionInterval: FiniteDuration, ackDeadlineSeconds: Int): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]
Create a flow that automatically extends ack deadlines for messages passing through it.
Create a flow that automatically extends ack deadlines for messages passing through it.
A background timer periodically sends
ModifyAckDeadlineRPCs for all tracked messages, preventing Pub/Sub from redelivering them while they are being processed downstream. Messages are tracked from the moment they enter the flow until the stream completes, fails, ormaxAckExtensionPeriodelapses (default 60 minutes, matching Google's client library). Messages are NOT removed from tracking upon acknowledgment; extending the deadline of an already-acknowledged message is a safe server-side no-op. For tracking that stops on acknowledgment, use the adaptive variant with AckDeadlineDistribution.If the background lease-extension ticker fails (e.g. due to a network error in the
ModifyAckDeadlineRPC), the main stream will be aborted immediately with an org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.AckDeadlineExtensionException, even if the stream is idle.Usage:
GooglePubSub.subscribe(request, 1.second) .via(GooglePubSub.autoExtendAckDeadlines(subscriptionFqrs, 8.seconds, 30)) .mapAsync(4)(processMessage) .map(msg => AcknowledgeRequest(subscriptionFqrs, Seq(msg.ackId))) .to(GooglePubSub.acknowledge(parallelism = 1))
- subscription
the fully qualified subscription resource string
- extensionInterval
how often to extend deadlines (should be less than the ack deadline)
- ackDeadlineSeconds
the new deadline to set on each extension
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def flowControlGate(flowControl: FlowControl): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]
Create a flow that gates messages based on FlowControl, applying backpressure when the number of outstanding (unacknowledged) messages reaches the limit.
Create a flow that gates messages based on FlowControl, applying backpressure when the number of outstanding (unacknowledged) messages reaches the limit.
Pair with a FlowControl-aware acknowledge or nack operator to release permits.
- flowControl
shared flow control instance
- Annotations
- @ApiMayChange()
- Since
2.0.0
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def modifyAckDeadline(parallelism: Int): Sink[ModifyAckDeadlineRequest, Future[Done]]
Create a sink that accepts ack deadline modifications.
Create a sink that accepts ack deadline modifications.
This can be used to extend the acknowledgement deadline for messages that require longer processing times, preventing Pub/Sub from redelivering them.
The materialized value completes on stream completion.
- parallelism
controls how many modifications can be in-flight at any given time
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def modifyAckDeadlineDynamic(subscription: String, parallelism: Int = 1)(deadlineFn: (com.google.pubsub.v1.pubsub.ReceivedMessage) => Int): Flow[com.google.pubsub.v1.pubsub.ReceivedMessage, com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]
Create a flow that modifies the ack deadline for each message using a dynamic function.
Create a flow that modifies the ack deadline for each message using a dynamic function.
This allows per-message deadline control — for example, setting longer deadlines for messages that are expected to take longer to process.
Returning 0 from the function is equivalent to a nack (immediate redelivery).
- subscription
the fully qualified subscription resource string
- parallelism
controls how many modifications can be in-flight at any given time
- deadlineFn
function that computes the ack deadline in seconds for each message
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def modifyAckDeadlineFlow(): Flow[ModifyAckDeadlineRequest, ModifyAckDeadlineRequest, NotUsed]
Create a flow that accepts ack deadline modifications.
Create a flow that accepts ack deadline modifications.
This can be used to extend the acknowledgement deadline for messages that require longer processing times, preventing Pub/Sub from redelivering them.
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def nack(parallelism: Int, distribution: AckDeadlineDistribution): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]
Create a sink that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
Create a sink that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
- parallelism
controls how many nacks can be in-flight at any given time
- distribution
shared distribution for adaptive deadline tracking
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def nack(parallelism: Int, flowControl: FlowControl): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]
Create a sink that nacks messages and releases FlowControl permits.
Create a sink that nacks messages and releases FlowControl permits.
- parallelism
controls how many nacks can be in-flight at any given time
- flowControl
shared flow control instance
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def nack(parallelism: Int): Sink[com.google.pubsub.v1.pubsub.AcknowledgeRequest, Future[Done]]
Create a sink that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.
Create a sink that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.
The materialized value completes on stream completion.
- parallelism
controls how many nacks can be in-flight at any given time
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def nackFlow(distribution: AckDeadlineDistribution): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]
Create a flow that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
Create a flow that nacks messages and records processing latencies in the AckDeadlineDistribution for adaptive deadline computation.
- distribution
shared distribution for adaptive deadline tracking
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def nackFlow(flowControl: FlowControl): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]
Create a flow that nacks messages and releases FlowControl permits.
Create a flow that nacks messages and releases FlowControl permits.
- flowControl
shared flow control instance
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def nackFlow(): Flow[com.google.pubsub.v1.pubsub.AcknowledgeRequest, com.google.pubsub.v1.pubsub.AcknowledgeRequest, NotUsed]
Create a flow that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.
Create a flow that nacks messages by setting the ack deadline to 0, causing Pub/Sub to immediately redeliver them.
Accepts AcknowledgeRequest for consistency with the acknowledge API — the subscription and ack IDs are reused to build the underlying
ModifyAckDeadlineRequestwithackDeadlineSeconds = 0.- Annotations
- @ApiMayChange()
- Since
2.0.0
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- def publish(parallelism: Int): Flow[com.google.pubsub.v1.pubsub.PublishRequest, com.google.pubsub.v1.pubsub.PublishResponse, NotUsed]
Create a flow to publish messages to Google Cloud Pub/Sub.
Create a flow to publish messages to Google Cloud Pub/Sub. The flow emits responses that contain published message ids.
- parallelism
controls how many messages can be in-flight at any given time
- def subscribe(request: StreamingPullRequest, pollInterval: FiniteDuration, restartSettings: RestartSettings): Source[com.google.pubsub.v1.pubsub.ReceivedMessage, NotUsed]
Create a source that emits messages for a given subscription using a StreamingPullRequest.
Create a source that emits messages for a given subscription using a StreamingPullRequest.
Automatically reconnects when the server closes the StreamingPull connection (due to idle timeout, rebalancing, etc.), matching the behavior of Google's official client library. All errors trigger reconnection with exponential backoff, including non-retryable errors such as PERMISSION_DENIED or NOT_FOUND. Use
RestartSettings.withMaxRestartsto bound the total number of restart attempts; when exhausted the stream fails with the last error.- request
the subscription FQRS and ack deadline fields are mandatory for the request
- pollInterval
time between StreamingPullRequest messages are being sent
- restartSettings
settings for the exponential backoff reconnection behavior
- Annotations
- @ApiMayChange()
- Since
2.0.0
- def subscribe(request: StreamingPullRequest, pollInterval: FiniteDuration): Source[com.google.pubsub.v1.pubsub.ReceivedMessage, Future[Cancellable]]
Create a source that emits messages for a given subscription using a StreamingPullRequest.
Create a source that emits messages for a given subscription using a StreamingPullRequest.
The materialized value can be used to cancel the source.
- request
the subscription FQRS and ack deadline fields are mandatory for the request
- pollInterval
time between StreamingPullRequest messages are being sent
- def subscribePolling(request: com.google.pubsub.v1.pubsub.PullRequest, pollInterval: FiniteDuration): Source[com.google.pubsub.v1.pubsub.ReceivedMessage, Future[Cancellable]]
Create a source that emits messages for a given subscription using a synchronous PullRequest.
Create a source that emits messages for a given subscription using a synchronous PullRequest.
The materialized value can be used to cancel the source.
- request
the subscription FQRS field is mandatory for the request
- pollInterval
time between PullRequest messages are being sent
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])