Google Pub/Sub Connector 1.0 Reference

Configurations


Configuration

Parameters

Name Type Description Default Value Required

Name

String

Name for this configuration. Connectors reference the configuration with this name.

x

Connection

Connection types for this configuration.

x

Default Project Id

String

Globally unique identifier for your project. Used in operations. It can be overridden at the config level.

Default Subscription Name

String

Default subscription for your project. Used in operations. It can be overridden at the config level.

Default Topic Name

String

Default topic for your project. Used in operations. It can be overridden at the config level.

Name

String

ID used to reference this configuration.

x

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Connection Types

PubSub Connection
Parameters
Name Type Description Default Value Required

Private key ID

String

ID of the private key for the service account. It is a part of the service-credentials created in PubSub.

x

Client ID

String

ID of the client for the service account. It is a part of the service-credentials created in PubSub.

x

Client email

String

Client email for the service account. It is a part of the service-credentials created in PubSub.

x

Private key

String

Private key for the service account. It is a part of the service-credentials created in PubSub.

x

Message Count Batch Size

Number

Maximum number of messages in the batch. After the maximum number of messages is reached, the elements are wrapped up in a batch and sent.

10

Request Size Threshold

Number

Maximum memory size of the messages in the batch. After the maximum memory size is reached, the elements are wrapped up in a batch and sent.

1

Request Size Threshold Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Data unit for the Request Size Threshold field.

KB

Publish Delay Threshold

Number

Maximum delay threshold to use for batching. After the maximum amount of time has elapsed (counting from the first element added), the elements are wrapped up in a batch and sent. This value must not be set too high, especially for the MILLISECONDS time unit. Otherwise, calls might appear to never complete.

10

Publish Delay Threshold Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Publish Delay Threshold field.

MILLISECONDS

Enable batching

Boolean

Flag for enabling or disabling publish request batching.

false

Limit Exceeded Behavior

Enumeration, one of:

  • THROW_EXCEPTION

  • BLOCK

  • IGNORE

If not set to IGNORE, flow control enables you to control the publishing behavior by specifying limits for Max Outstanding Request Size and Max Outstanding Element Count.

  • THROW_EXCEPTION

    If the memory size or the amount of outstanding elements (messages where no Ack() or Nack() was performed) exceeds the specified limits, the publisher throws a FlowControlException.

  • BLOCK

    If the memory size or the amount of outstanding elements (messages where no Ack() or Nack() was performed) exceeds the specified limits, the publisher does not publish more messages.

  • IGNORE

    The publisher does not account for specified limits and does not control the message publishing rate.

Max Outstanding Request Size

Number

Maximum amount of memory accumulated by the request before enforcing flow control.

100

Max Outstanding Request Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Data unit for the Max Outstanding Request Size field.

MB

Max Outstanding Element Count

Number

Maximum number of outstanding elements to keep in memory before enforcing flow control.

100

Reconnection

Associated Sources

Operations

Create Snapshot

<pubsub:create-snapshot>

Creates a snapshot from the requested subscription. Use snapshots in seek operations, which manage message acknowledgments in bulk. This operation enables you to set the acknowledgment state of messages in an existing subscription to the state captured by a snapshot.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Snapshot Name

String

Project-wide unique identifier for a snapshot.

x

Labels

Object

Key-value pairs that help users to organize Google Cloud resources. Attaching labels to resources filters them based on the labels.

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Project ID

String

Globally unique identifier for your project.

Subscription Name

String

Project-wide unique identifier for the subscription.

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Snapshot

For Configurations

Throws

  • PUBSUB:ALREADY_EXISTS

  • PUBSUB:BAD_GATEWAY

  • PUBSUB:CANCELLED

  • PUBSUB:CONNECTIVITY

  • PUBSUB:DEADLINE_EXCEEDED

  • PUBSUB:FAILED_PRECONDITION

  • PUBSUB:INVALID_ARGUMENT

  • PUBSUB:NOT_FOUND

  • PUBSUB:PERMISSION_DENIED

  • PUBSUB:RETRY_EXHAUSTED

  • PUBSUB:UNAVAILABLE

Delete Snapshot

<pubsub:delete-snapshot>

Deletes an existing snapshot.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Project ID

String

Globally unique identifier for your project.

Snapshot Name

String

Project-wide unique identifier for a snapshot.

x

Reconnection Strategy

Retry strategy in case of connectivity errors.

For Configurations

Throws

  • PUBSUB:BAD_GATEWAY

  • PUBSUB:CANCELLED

  • PUBSUB:CONNECTIVITY

  • PUBSUB:DEADLINE_EXCEEDED

  • PUBSUB:FAILED_PRECONDITION

  • PUBSUB:INVALID_ARGUMENT

  • PUBSUB:NOT_FOUND

  • PUBSUB:PERMISSION_DENIED

  • PUBSUB:RETRY_EXHAUSTED

  • PUBSUB:UNAVAILABLE

Get Snapshot

<pubsub:get-snapshot>

Retrieves a snapshot.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Project ID

String

Globally unique identifier for your project.

Snapshot Name

String

Project-wide unique identifier for a snapshot.

x

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Snapshot

For Configurations

Throws

  • PUBSUB:BAD_GATEWAY

  • PUBSUB:CANCELLED

  • PUBSUB:CONNECTIVITY

  • PUBSUB:DEADLINE_EXCEEDED

  • PUBSUB:FAILED_PRECONDITION

  • PUBSUB:INVALID_ARGUMENT

  • PUBSUB:NOT_FOUND

  • PUBSUB:PERMISSION_DENIED

  • PUBSUB:RETRY_EXHAUSTED

  • PUBSUB:UNAVAILABLE

Get Snapshot List

<pubsub:get-snapshot-list>

Retrieves a list of existing snapshots from a selected project.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Project Id

String

Globally unique identifier for your project.

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Streaming Strategy

Configures how Mule processes streams. The default is to use repeatable streams.

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Array of Snapshot

For Configurations

Throws

  • PUBSUB:BAD_GATEWAY

  • PUBSUB:CANCELLED

  • PUBSUB:DEADLINE_EXCEEDED

  • PUBSUB:FAILED_PRECONDITION

  • PUBSUB:INVALID_ARGUMENT

  • PUBSUB:NOT_FOUND

  • PUBSUB:PERMISSION_DENIED

  • PUBSUB:UNAVAILABLE

Patch Snapshot

<pubsub:patch-snapshot>

Updates an existing snapshot. The name and the topic of the snapshot are not mutable fields and cannot be updated.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Snapshot

Snapshot object with updated fields (labels or expireTime).

#[payload]

Update Mask

String

Indicates which fields in the provided snapshot to update. Must be specified and non-empty. This is a comma-separated list of fully qualified names of fields.

x

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Snapshot

For Configurations

Throws

  • PUBSUB:BAD_GATEWAY

  • PUBSUB:CANCELLED

  • PUBSUB:CONNECTIVITY

  • PUBSUB:DEADLINE_EXCEEDED

  • PUBSUB:FAILED_PRECONDITION

  • PUBSUB:INVALID_ARGUMENT

  • PUBSUB:NOT_FOUND

  • PUBSUB:PERMISSION_DENIED

  • PUBSUB:RETRY_EXHAUSTED

  • PUBSUB:UNAVAILABLE

Publish Message

<pubsub:publish-message>

Publishes a single message.

This operation is non-blocking. When batching is enabled, the message is stored only locally until any of the batching restrictions are reached.

This operation may lead to high throughput. When using the Ordering Key field, processing messages in the order they were called is not guaranteed, especially in threaded environments. If you must maintain a strict order, set Max Concurrency to 1 in your source flow settings and add a delay in the for-each loops that contain order critical Publish calls.

This operation returns the messageId, which is the unique identifier of the sent message in the topic.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Ordering Key

String

If specified, this field identifies related messages in which the publish order must be respected when the subscriber enables message ordering.

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Project ID

String

Globally unique identifier for your project.

Topic Name

String

Project-wide unique identifier for the topic.

Message

Binary

Message content published to the topic. If not empty, the message must contain at least one attribute.

Attributes

Object

Attributes for this message. If empty, the message must contain non-empty data. Can be used to filter messages on the subscription.

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

String

For Configurations

Throws

  • PUBSUB:BAD_GATEWAY

  • PUBSUB:CANCELLED

  • PUBSUB:CONNECTIVITY

  • PUBSUB:DEADLINE_EXCEEDED

  • PUBSUB:FAILED_PRECONDITION

  • PUBSUB:INVALID_ARGUMENT

  • PUBSUB:MAX_OUTSTANDING_BATCH_SIZE_REACHED

  • PUBSUB:MAX_OUTSTANDING_ELEMENT_COUNT_REACHED

  • PUBSUB:NOT_FOUND

  • PUBSUB:PERMISSION_DENIED

  • PUBSUB:RETRY_EXHAUSTED

  • PUBSUB:UNAVAILABLE

Seek Messages

<pubsub:seek-messages>

Supports the bulk acknowledging or un-acknowledging of messages to a given snapshot or point of time based on the provided seek target.

Seeking to a point in time marks every message received by Pub/Sub before the time as acknowledged, and all messages received after the time as unacknowledged. You can replay and reprocess previously acknowledged messages when seeking to a time in the past or purge messages when seeking to a time in the future.

Seeking to a snapshot enables you to redeliver only the messages in the snapshot that match the filter of the subscription making the seek request.

Once a snapshot is created, it retains all messages that were unacknowledged in the source subscription at the time of the snapshot’s creation and any messages published to the topic thereafter. You can replay these unacknowledged messages by using a snapshot to seek to any of the topic’s subscriptions.

If you seek to a snapshot using a subscription with a filter, the Pub/Sub service redelivers only the messages in the snapshot that match the filter of the subscription making the seek request.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Project ID

String

Globally unique identifier for your project.

Subscription Name

String

Project-wide unique identifier for the subscription.

Snapshot Name

String

Snapshot to seek to. The snapshot’s topic must be the same as that of the provided subscription.

Timestamp

DateTime

Specific time to seek to. Messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged.


This operation affects only messages retained in the subscription (configured by the combination of Message Retention Duration and Retain Acked Messages). For example, if the time corresponds to a point before the message retention window (or to a point before the system’s notion of the subscription creation time), only retained messages are marked as unacknowledged, and already-expunged messages are not restored.


A timestamp is in RFC3339 UTC Zulu format, with nanosecond resolution and up to nine fractional digits, such as 2014-10-02T15:01:23Z and 2014-10-02T15:01:23.045123456Z.

Reconnection Strategy

Retry strategy in case of connectivity errors.

For Configurations

Throws

  • PUBSUB:BAD_GATEWAY

  • PUBSUB:CANCELLED

  • PUBSUB:CONNECTIVITY

  • PUBSUB:DEADLINE_EXCEEDED

  • PUBSUB:FAILED_PRECONDITION

  • PUBSUB:INVALID_ARGUMENT

  • PUBSUB:NOT_FOUND

  • PUBSUB:PERMISSION_DENIED

  • PUBSUB:RETRY_EXHAUSTED

  • PUBSUB:UNAVAILABLE

Sources

On message listener

<pubsub:message-listener>

Asynchronous message listener that consumes messages from one subscriber.

Parameters

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Consumer count

Number

Provides a specified amount of executor service for processing messages.

5

Config Ref

ConfigurationProvider

Name of the configuration used to execute this component.

x

Primary Node Only

Boolean

Determines whether to execute this source on only the primary node when running Mule instances in a cluster.

Streaming Strategy

Configures how Mule processes streams. The default is to use repeatable streams.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Project ID

String

Globally unique identifier for your project.

Subscription Name

String

Project-wide unique identifier for the subscription.

Limit Exceeded Behavior

Enumeration, one of:

  • THROW_EXCEPTION

  • BLOCK

  • IGNORE

If not set to IGNORE, flow control enables you to control the publishing behavior by specifying limits for Max Outstanding Request Size and Max Outstanding Element Count.

  • THROW_EXCEPTION

    If the memory size or the amount of outstanding elements (messages where no Ack() or Nack() was performed) exceeds the specified limits, the receiver throws a FlowControlException.

  • BLOCK

    If the memory size or the amount of outstanding elements (messages where no Ack() or Nack() was performed) exceeds the specified limits, the receiver does not receive more messages.

  • IGNORE

    The receiver does not account for specified limits and does not control the message delivery rate.

Max Outstanding Request Size

Number

Maximum amount of memory accumulated by the request before enforcing flow control.

100

Max Outstanding Request Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Data unit for the Max Outstanding Request Size field.

MB

Max Outstanding Element Count

Number

Maximum number of outstanding elements to keep in the memory before enforcing flow control.

100

Reconnection Strategy

Output

Type

Any

Attributes Type

Object

For Configurations

Types

Reconnection

Configures a reconnection strategy for an operation.

Field Type Description Default Value Required

Fails Deployment

Boolean

What to do if, when an app is deployed, a connectivity test does not pass after exhausting the associated reconnection strategy:

  • true

    Allow the deployment to fail.

  • false

    Ignore the results of the connectivity test.

Reconnection Strategy

Reconnection strategy to use.

Reconnect

Configures a standard reconnection strategy, which specifies how often to reconnect and how many reconnection attempts the connector source or operation can make.

Field Type Description Default Value Required

Frequency

Number

How often to attempt to reconnect, in milliseconds.

Blocking

Boolean

If false, the reconnection strategy will run in a separate, non-blocking thread.

Count

Number

How many reconnection attempts the Mule app can make.

Reconnect Forever

Configures a forever reconnection strategy by which the connector source or operation attempts to reconnect at a specified frequency for as long as the Mule app runs.

Field Type Description Default Value Required

Frequency

Number

How often to attempt to reconnect, in milliseconds.

Blocking

Boolean

If false, the reconnection strategy will run in a separate, non-blocking thread.

Expiration Policy

Field Type Description Default Value Required

Max Idle Time

Number

Configures the maximum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Max Idle Time field.

Repeatable In Memory Stream

Configures the in-memory streaming strategy by which the request fails if the data exceeds the MAX buffer size. Always run performance tests to find the optimal buffer size for your specific use case.

Field Type Description Default Value Required

Initial Buffer Size

Number

Initial amount of memory to allocate to the data stream. If the streamed data exceeds this value, the buffer expands by Buffer Size Increment, with an upper limit of Max In Memory Size value.

Buffer Size Increment

Number

Amount by which the buffer size expands if it exceeds its initial size. Setting a value of 0 or lower specifies that the buffer can’t expand.

Max Buffer Size

Number

Maximum size of the buffer. If the buffer size exceeds this value, Mule raises a STREAM_MAXIMUM_SIZE_EXCEEDED error. A value of less than or equal to 0 means no limit.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit for the Initial Buffer Size, Buffer Size Increment, and Buffer Unit fields.

Repeatable File Store Stream

Configures the repeatable file-store streaming strategy by which Mule keeps a portion of the stream content in memory. If the stream content is larger than the configured buffer size, Mule backs up the buffer’s content to disk and then clears the memory.

Field Type Description Default Value Required

In Memory Size

Number

Maximum amount of memory that the stream can use for data. If the amount of memory exceeds this value, Mule buffers the content to disk. To optimize performance:

  • Configure a larger buffer size to avoid the number of times Mule needs to write the buffer on disk. This increases performance, but it also limits the number of concurrent requests your application can process, because it requires additional memory.

  • Configure a smaller buffer size to decrease memory load at the expense of response time.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit for the In Memory Size field.

Redelivery Policy

Configures the redelivery policy for executing requests that generate errors. You can add a redelivery policy to any source in a flow.

Field Type Description Default Value Required

Max Redelivery Count

Number

Maximum number of times that a redelivered request can be processed unsuccessfully before returning a REDELIVERY_EXHAUSTED error.

Message Digest Algorithm

String

Secure hashing algorithm to use if the Use Secure Hash field is true. If the payload of the message is a Java object, Mule ignores this value and returns the value that the payload’s hashCode() returned.

Message Identifier

Defines which strategy is used to identify the messages.

Object Store

ObjectStore

Configures the object store that stores the redelivery counter for each message.

Redelivery Policy Message Identifier

Configures how to identify a redelivered message and how to find out when the message was redelivered.

Field Type Description Default Value Required

Use Secure Hash

Boolean

If true, Mule uses a secure hash algorithm to identify a redelivered message.

Id Expression

String

One or more expressions that determine when a message was redelivered. You can set this property only if the Use Secure Hash field is false.

Snapshot

Snapshot resource. You can use snapshots in seek operations, which manage message acknowledgments in bulk. This type enables you to set the acknowledgment state of messages in an existing subscription to the state captured by a snapshot.

Field Type Description Default Value Required

Name

String

Name of the snapshot.

Topic

String

Name of the topic from which the snapshot retains messages.

Expire Time

DateTime

Time length for which the snapshot is guaranteed to exist. The lifetime for a newly-created snapshot is based on the oldest unacked message in the source subscription’s backlog, however, the snapshot can exist for a maximum of seven days.


Consider a subscription with an oldest unacked message that is three days old. If a snapshot is created from this subscription, the snapshot will expire in four days. The service will refuse to create a snapshot that will expire in less than one hour after creation.


A timestamp is in RFC3339 UTC Zulu format, with nanosecond resolution and up to nine fractional digits, such as 2014-10-02T15:01:23Z and 2014-10-02T15:01:23.045123456Z.

Labels

Any

Object containing a list of key-value pairs, such as { "name": "wrench", "mass": "1.3kg", "count": "3" }.

Repeatable In Memory Iterable

Field Type Description Default Value Required

Initial Buffer Size

Number

The number of instances to initially keep in memory to consume the stream and provide random access to it. If the stream contains more data than can fit into this buffer, then the buffer expands according to the Buffer Size Increment field, with an upper limit of Max In Memory Size.

100

Buffer Size Increment

Number

This is by how much the buffer size expands if it exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, meaning that a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised when the buffer gets full.

100

Max Buffer Size

Number

The maximum amount of memory to use. If more than that is used then a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised. A value lower than or equal to zero means no limit.

Repeatable File Store Iterable

Field Type Description Default Value Required

In Memory Objects

Number

The maximum amount of instances to keep in memory. If more than that is required, content on the disk is buffered.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit for the In Memory Objects field.