Skip to content

Commit

Permalink
feat: add support for exactly once subscriptions (#1572)
Browse files Browse the repository at this point in the history
Adds support for exactly-once delivery subscriptions. Please see the samples for information on how to interact with exactly-once subscriptions properly (specifically, using the `*WithResponse()` methods).

Other client library folks - Mahesh needs to review this, so please don't merge until that happens.

Fixes #1571 🦕
  • Loading branch information
feywind committed Sep 22, 2022
1 parent d72db50 commit 998de35
Show file tree
Hide file tree
Showing 22 changed files with 2,241 additions and 130 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Create Push Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createPushSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createPushSubscription.js,samples/README.md) |
| Create Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscription.js,samples/README.md) |
| Create Subscription With Dead Letter Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithDeadLetterPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithDeadLetterPolicy.js,samples/README.md) |
| Create an exactly-once delivery subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md) |
| Create Subscription With Filtering | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithFiltering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithFiltering.js,samples/README.md) |
| Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) |
| Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) |
Expand All @@ -148,6 +149,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Listen For Avro Records | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForAvroRecords.js,samples/README.md) |
| Listen For Errors | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForErrors.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForErrors.js,samples/README.md) |
| Listen For Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessages.js,samples/README.md) |
| Listen with exactly-once delivery | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md) |
| Listen For Protobuf Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForProtobufMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForProtobufMessages.js,samples/README.md) |
| Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) |
| Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) |
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"extend": "^3.0.2",
"google-auth-library": "^8.0.2",
"google-gax": "^3.3.0",
"heap-js": "^2.2.0",
"is-stream-ended": "^0.1.4",
"lodash.snakecase": "^4.1.1",
"p-defer": "^3.0.0"
Expand Down
40 changes: 40 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ guides.
* [Create Push Subscription](#create-push-subscription)
* [Create Subscription](#create-subscription)
* [Create Subscription With Dead Letter Policy](#create-subscription-with-dead-letter-policy)
* [Create an exactly-once delivery subscription](#create-an-exactly-once-delivery-subscription)
* [Create Subscription With Filtering](#create-subscription-with-filtering)
* [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled)
* [Create Topic](#create-topic)
Expand All @@ -45,6 +46,7 @@ guides.
* [Listen For Avro Records](#listen-for-avro-records)
* [Listen For Errors](#listen-for-errors)
* [Listen For Messages](#listen-for-messages)
* [Listen with exactly-once delivery](#listen-with-exactly-once-delivery)
* [Listen For Protobuf Messages](#listen-for-protobuf-messages)
* [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes)
* [Modify Push Configuration](#modify-push-configuration)
Expand Down Expand Up @@ -199,6 +201,25 @@ __Usage:__



### Create an exactly-once delivery subscription

Demonstrates how to create a subscription for exactly-once delivery.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md)

__Usage:__


`node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>`


-----




### Create Subscription With Filtering

Creates a new subscription with filtering.
Expand Down Expand Up @@ -560,6 +581,25 @@ __Usage:__



### Listen with exactly-once delivery

Listen for messages on an exactly-once delivery subscription.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md)

__Usage:__


`node listenForMessagesWithExactlyOnceDelivery.js <subscription-name-or-id>`


-----




### Listen For Protobuf Messages

Listens for messages in protobuf encoding from a subscription.
Expand Down
77 changes: 77 additions & 0 deletions samples/createSubscriptionWithExactlyOnceDelivery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This application demonstrates how to perform basic operations on
* schemas with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// This is a generated sample. Please see typescript/README.md for more info.

'use strict';

// sample-metadata:
// title: Create an exactly-once delivery subscription
// description: Demonstrates how to create a subscription for exactly-once delivery.
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>

// [START pubsub_create_subscription_with_exactly_once_delivery]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
topicNameOrId,
subscriptionNameOrId
) {
// Creates a new subscription
await pubSubClient
.topic(topicNameOrId)
.createSubscription(subscriptionNameOrId, {
enableExactlyOnceDelivery: true,
});
console.log(
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
);
console.log(
'To process messages, remember to check the return value of ackWithResponse().'
);
}
// [END pubsub_create_subscription_with_exactly_once_delivery]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'
) {
createSubscriptionWithExactlyOnceDelivery(
topicNameOrId,
subscriptionNameOrId
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
103 changes: 103 additions & 0 deletions samples/listenForMessagesWithExactlyOnceDelivery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This application demonstrates how to perform basic operations on
* schemas with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// This is a generated sample. Please see typescript/README.md for more info.

'use strict';

// sample-metadata:
// title: Listen with exactly-once delivery
// description: Listen for messages on an exactly-once delivery subscription.
// usage: node listenForMessagesWithExactlyOnceDelivery.js <subscription-name-or-id>

// [START pubsub_subscriber_exactly_once]
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId,
timeout
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount++;

// Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
// the result of the acknowledge call. When exactly-once delivery is enabled
// on the subscription, the message is guaranteed not to be delivered again
// if the ack Promise resolves.
try {
// When the Promise resolves, the value is always AckResponses.Success,
// signaling that the ack was accepted. Note that you may call this
// method on a subscription without exactly-once delivery, but it will
// always return AckResponses.Success.
await message.ackWithResponse();
console.log(`Ack for message ${message.id} successful.`);
} catch (e) {
// In all other cases, the error passed on reject will explain why. This
// is only for permanent failures; transient errors are retried automatically.
const ackError = e;
console.log(
`Ack for message ${message.id} failed with error: ${ackError.errorCode}`
);
}
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_subscriber_exactly_once]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
timeout = 60
) {
listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId,
Number(timeout)
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
39 changes: 39 additions & 0 deletions samples/system-test/subscriptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,43 @@ describe('subscriptions', () => {
.get();
assert.strictEqual(subscription.metadata?.enableMessageOrdering, true);
});

it('should create an exactly-once delivery sub and listen on it.', async () => {
const testId = 'eos';
const topic = await createTopic(testId);
const subName = reserveSub(testId);
const output = execSync(
`${commandFor('createSubscriptionWithExactlyOnceDelivery')} ${
topic.name
} ${subName}`
);
assert.include(
output,
`Created subscription ${subName} with exactly-once delivery.`
);

const [subscription] = await pubsub
.topic(topic.name)
.subscription(subName)
.get();
assert.strictEqual(subscription.metadata?.enableExactlyOnceDelivery, true);

const message = Buffer.from('test message');
const messageIds = [
await topic.publishMessage({
data: message,
}),
await topic.publishMessage({
data: message,
}),
];

const output2 = execSync(
`${commandFor('listenForMessagesWithExactlyOnceDelivery')} ${subName} 15`
);

for (const id of messageIds) {
assert.include(output2, `Ack for message ${id} successful`);
}
});
});
73 changes: 73 additions & 0 deletions samples/typescript/createSubscriptionWithExactlyOnceDelivery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This application demonstrates how to perform basic operations on
* schemas with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create an exactly-once delivery subscription
// description: Demonstrates how to create a subscription for exactly-once delivery.
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>

// [START pubsub_create_subscription_with_exactly_once_delivery]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
topicNameOrId: string,
subscriptionNameOrId: string
) {
// Creates a new subscription
await pubSubClient
.topic(topicNameOrId)
.createSubscription(subscriptionNameOrId, {
enableExactlyOnceDelivery: true,
});
console.log(
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
);
console.log(
'To process messages, remember to check the return value of ackWithResponse().'
);
}
// [END pubsub_create_subscription_with_exactly_once_delivery]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'
) {
createSubscriptionWithExactlyOnceDelivery(
topicNameOrId,
subscriptionNameOrId
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));

0 comments on commit 998de35

Please sign in to comment.