Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publishing sometimes fails when ordering key is enabled #1084

Closed
pamarc opened this issue Feb 21, 2024 · 4 comments · Fixed by #1134
Closed

Publishing sometimes fails when ordering key is enabled #1084

pamarc opened this issue Feb 21, 2024 · 4 comments · Fixed by #1134
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.

Comments

@pamarc
Copy link

pamarc commented Feb 21, 2024

Environment details

  • OS type and version: Debian Bookworm (Docker slim image)
  • Python version: 3.9.18
  • pip version: 23.1
  • google-cloud-pubsub version: 2.18.4
  • grpcio: 1.59.0

Description

Hello there,

We’ve encountered some publishing issues for a while now and ran out of ideas on how to fix them. The behaviour is the following. We publish messages into a few topics that are configured with an ordering key. Multiple times a week (or even a day), the publishing process fails and the client publisher stops (it seems to be stuck?), as no messages are published. On Google Cloud Monitoring, it usually states a ‘deadline exceeded’ and indeed, we do have timeouts when that happens (but no exception on the client side). This first observation is a bit weird though, given that we don’t publish massive loads of messages (just a few messages every second).

After (!) reading the documentation, this behaviour is supposed to be expected with ordering keys.

Code example

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(
    enable_message_ordering=True,
    flow_control=PublishFlowControl(
        message_limit=2000, 
        limit_exceeded_behavior=LimitExceededBehavior.BLOCK
            ),)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
batch_settings = BatchSettings(
            1 * 1000 * 1000,
            0.01,
            1000,
        )
publisher = pubsub_v1.PublisherClient(
		batch_settings,
    publisher_options=publisher_options, 
		client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    future.add_done_callback(resume_if_error)

def resume_if_error(future):
		try:
        print(future.result())
    except RuntimeError:
        # Resume publish on an ordering key that has had unrecoverable errors.
        publisher.resume_publish(topic_path, ordering_key)

print(f"Resumed publishing messages with ordering keys to {topic_path}.")

We wanted to avoid calling result() in the main thread as it is blocking, but use a callback instead. Anyway, this does not work, as we still have the same issues.

Can anyone help?

@pamarc
Copy link
Author

pamarc commented Apr 4, 2024

Hi @mukund-ananthu, can you give me details on how your fix will help on this issue? Thanks :)

@mukund-ananthu
Copy link
Contributor

Hi @pamarc ,

Sure. For publishing with ordering keys:

Prior to the fix, the retry deadline was set to infinite, however default_timeout was still set to default of 60s. This default_timeout is used as the timeout for the wrapped function(publish in this case) here. It was observed that both deadline and default_timeout correspond to timeouts across multiple retries, instead of deadline corresponding to timeout across multiple retries and default_timeout corresponding to timeout associated with each individual RPC call(initial RPC call or any retry RPC call). We are following up with the gapic-generator-python library for addition of this feature

As a result, for retryable errors, there would be an initial set of retries where the client library makes requests to the backend within this 60s time frame. If the retries have not succeeded within this timeframe, then the publish call times out due to default_timeout=60s. Thereafter, the client library does not make calls to the backend, but the lower level functions return deadline exceed for each retry attempt. Since the deadline is still set to infinite, and it wraps the timeout here, the function repeatedly invokes the wrapped function(publish), receives a deadline exceed, and retries infinitely without returning control to the calling code. This causes the control to be stuck here forever, without response ever being returned, in addition to no additional calls being made to the backend.

With this fix, instead of timing out after 60s and not making further calls to the backend after that, the client library continues to make calls to the backend infinitely. When the transient issue in the backend is resolved, the retry from the client library should eventually succeed. The issue from the client library perspective was that it was not making the required retries to the backend post 60s.

@mukund-ananthu
Copy link
Contributor

mukund-ananthu commented Apr 4, 2024

Also, w.r.t resume_publish, it would only be required when the client library enters a paused state. This occurs when the backend returns a non-retryable error code: If a non-retryable error occurs, the client library doesn't publish the message and stops publishing other messages with the same ordering key.. For non-retriable error codes, the client library already returns the error back to the calling code and subsequent publish attempts would result in an error returned. Given that that was not the behavior observed here, that does not seem to be related to the issue.

If you do want to write application logic to handle such paused states of the library in case of non-retriable error codes, when / where / whether the resume_publish should be done would depend on how you would want to respond to the particular non-retriable error code and it may not always be apt to do it in an async manner with callbacks. For example, if you receive a not found error non-retryable error code, as a result of say, publishing to a non-existent topic, then it may be more appropriate to call resume_publish only after the cause of the non-retriable error is resolved(topic is created).

@pamarc
Copy link
Author

pamarc commented Apr 8, 2024

@mukund-ananthu thanks a lot for this detailed explanation, this is very clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants