Receive messages from a subscription with exactly once delivery enabled.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
C++
Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack().then([id = m.message_id()](auto f) {
auto status = f.get();
std::cout << "Message id " << id
<< " ack() completed with status=" << status << "\n";
});
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
C#
Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;
public class ExactlyOnceDeliverySubscriberAsyncSample
{
public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
{
// subscriptionId should be the ID of an exactly-once delivery subscription.
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
// To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
// create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler.
// For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
// https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
var subscriptionHandler = new SampleSubscriptionHandler();
Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
// The subscriber will be running until it is stopped.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Let's make sure that the start task finished successfully after the call to stop.
await subscriptionTask;
return subscriptionHandler.SuccessfulAckedIds;
}
// Sample handler to handle messages and ACK/NACK responses.
public class SampleSubscriptionHandler : SubscriptionHandler
{
public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();
/// <summary>
/// The function that processes received messages. It should be thread-safe.
/// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
/// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
/// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
/// </summary>
public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
{
string text = message.Data.ToStringUtf8();
Console.WriteLine($"Message {message.MessageId}: {text}");
return await Task.FromResult(Reply.Ack);
}
/// <summary>
/// This method will receive responses for all acknowledge requests.
/// </summary>
public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
{
foreach (var response in responses)
{
if (response.Status == AcknowledgementStatus.Success)
{
SuccessfulAckedIds.Add(response.MessageId);
}
string result = response.Status switch
{
AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
_ => $"Unknown acknowledgement status for messageId {response.MessageId}."
};
Console.WriteLine(result);
}
}
}
}
Go
Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub"
)
// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Set MinExtensionPeriod high to avoid any unintentional
// acknowledgment expirations (e.g. due to network events).
// This can lead to high tail latency in case of client crashes.
sub.ReceiveSettings.MinExtensionPeriod = 600 * time.Second
// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
r := msg.AckWithResult()
// Block until the result is returned and a pubsub.AcknowledgeStatus
// is returned for the acked message.
status, err := r.Get(ctx)
if err != nil {
fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
}
switch status {
case pubsub.AcknowledgeStatusSuccess:
fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
case pubsub.AcknowledgeStatusInvalidAckID:
fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
case pubsub.AcknowledgeStatusPermissionDenied:
fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
case pubsub.AcknowledgeStatusFailedPrecondition:
fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
case pubsub.AcknowledgeStatusOther:
fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
default:
}
})
if err != nil {
return fmt.Errorf("got err from sub.Receive: %w", err)
}
return nil
}
Java
Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithExactlyOnceConsumerWithResponseExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
}
public static void subscribeWithExactlyOnceConsumerWithResponseExample(
String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
// instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
// When exactly once delivery is enabled on the subscription, the message is guaranteed
// to not be delivered again if the ack future succeeds.
MessageReceiverWithAckResponse receiverWithResponse =
(PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
try {
// Handle incoming message, then ack the message, and receive an ack response.
System.out.println("Message received: " + message.getData().toStringUtf8());
Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();
// Retrieve the completed future for the ack response from the server.
AckResponse ackResponse = ackResponseFuture.get();
switch (ackResponse) {
case SUCCESSFUL:
// Success code means that this MessageID will not be delivered again.
System.out.println("Message successfully acked: " + message.getMessageId());
break;
case INVALID:
System.out.println(
"Message failed to ack with a response of Invalid. Id: "
+ message.getMessageId());
break;
case PERMISSION_DENIED:
System.out.println(
"Message failed to ack with a response of Permission Denied. Id: "
+ message.getMessageId());
break;
case FAILED_PRECONDITION:
System.out.println(
"Message failed to ack with a response of Failed Precondition. Id: "
+ message.getMessageId());
break;
case OTHER:
System.out.println(
"Message failed to ack with a response of Other. Id: "
+ message.getMessageId());
break;
default:
break;
}
} catch (InterruptedException | ExecutionException e) {
System.out.println(
"MessageId: " + message.getMessageId() + " failed when retrieving future");
} catch (Throwable t) {
System.out.println("Throwable caught" + t.getMessage());
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId,
timeout
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount++;
// Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
// the result of the acknowledge call. When exactly-once delivery is enabled
// on the subscription, the message is guaranteed not to be delivered again
// if the ack Promise resolves.
try {
// When the Promise resolves, the value is always AckResponses.Success,
// signaling that the ack was accepted. Note that you may call this
// method on a subscription without exactly-once delivery, but it will
// always return AckResponses.Success.
await message.ackWithResponse();
console.log(`Ack for message ${message.id} successful.`);
} catch (e) {
// In all other cases, the error passed on reject will explain why. This
// is only for permanent failures; transient errors are retried automatically.
const ackError = e;
console.log(
`Ack for message ${message.id} failed with error: ${ackError.errorCode}`
);
}
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
Node.js
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId: string,
timeout: number
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async (message: Message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount++;
// Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
// the result of the acknowledge call. When exactly-once delivery is enabled
// on the subscription, the message is guaranteed not to be delivered again
// if the ack Promise resolves.
try {
// When the Promise resolves, the value is always AckResponses.Success,
// signaling that the ack was accepted. Note that you may call this
// method on a subscription without exactly-once delivery, but it will
// always return AckResponses.Success.
await message.ackWithResponse();
console.log(`Ack for message ${message.id} successful.`);
} catch (e) {
// In all other cases, the error passed on reject will explain why. This
// is only for permanent failures; transient errors are retried automatically.
const ackError = e as AckError;
console.log(
`Ack for message ${message.id} failed with error: ${ackError.errorCode}`
);
}
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
PHP
Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
use Google\Cloud\PubSub\PubSubClient;
/**
* Subscribe and pull messages from a subscription
* with `Exactly Once Delivery` enabled.
*
* @param string $projectId
* @param string $subscriptionId
*/
function subscribe_exactly_once_delivery(
string $projectId,
string $subscriptionId
): void {
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
$messages = $subscription->pull();
foreach ($messages as $message) {
// When exactly once delivery is enabled on the subscription,
// the message is guaranteed to not be delivered again if the ack succeeds.
// Passing the `returnFailures` flag retries any temporary failures received
// while acking the msg and also returns any permanently failed msgs.
// Passing this flag on a subscription with exactly once delivery disabled
// will always return an empty array.
$failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);
if (empty($failedMsgs)) {
printf('Acknowledged message: %s' . PHP_EOL, $message->data());
} else {
// Either log or store the $failedMsgs to be retried later
}
}
}
Python
Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")
# Use `ack_with_response()` instead of `ack()` to get a future that tracks
# the result of the acknowledge call. When exactly-once delivery is enabled
# on the subscription, the message is guaranteed to not be delivered again
# if the ack future succeeds.
ack_future = message.ack_with_response()
try:
# Block on result of acknowledge call.
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
ack_future.result(timeout=timeout)
print(f"Ack for message {message.message_id} successful.")
except sub_exceptions.AcknowledgeError as e:
print(
f"Ack for message {message.message_id} failed with error: {e.error_code}"
)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Ruby
Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
require "google/cloud/pubsub"
# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
pubsub = Google::Cloud::Pubsub.new project_id: project_id
topic = pubsub.topic topic_id
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen do |received_message|
puts "Received message: #{received_message.data}"
# Pass in callback to access the acknowledge result.
# For subscription with Exactly once delivery disabled the result will be success always.
received_message.acknowledge! do |result|
puts "Acknowledge result's status: #{result.status}"
end
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
end
def self.run
# TODO(developer): Replace these variables before running the sample.
project_id = "your-project-id"
topic_id = "your-topic-id"
subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
topic_id: topic_id,
subscription_id: subscription_id
end
end
if $PROGRAM_NAME == __FILE__
PubsubSubscriberExactlyOnceDelivery.run
end
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.