Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid unnecessary API call in QueryJob.result() when job is already finished #1900

Merged
merged 14 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
add comments explaining expected sequences of API calls
  • Loading branch information
tswast committed Apr 17, 2024
commit 4ee4975094163a5ae8139676afeb8c2eada350a3
42 changes: 37 additions & 5 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,13 +1550,31 @@ def is_job_done():
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
# If it's already failed, we might as well stop.
if self.exception() is not None:
job_failed_exception = self.exception()
if job_failed_exception is not None:
# Only try to restart the query job if the job failed for
# a retriable reason. For example, don't restart the query
# if the call to reload the job metadata within self.done()
# timed out.
#
# The `restart_query_job` must only be called after a
# successful call to the `jobs.get` REST API and we
# determine that the job has failed.
#
# The `jobs.get` REST API
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
# is called via `self.done()` which calls
# `self.reload()`.
#
# To determine if the job failed, the `self.exception()`
# is set from `self.reload()` via
# `self._set_properties()`, which translates the
# `Job.status.errorResult` field
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
# into an exception that can be processed by the
# `job_retry` predicate.
restart_query_job = True
raise self.exception()
raise job_failed_exception
else:
# Make sure that the _query_results are cached so we
# can return a complete RowIterator.
Expand Down Expand Up @@ -1584,9 +1602,23 @@ def is_job_done():
if retry_do_query is not None and job_retry is not None:
is_job_done = job_retry(is_job_done)

# timeout can be `None` or an object from our superclass
# indicating a default timeout.
remaining_timeout = timeout if isinstance(timeout, (float, int)) else None
# timeout can be a number of seconds, `None`, or a
# `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
# sentinel object indicating a default timeout if we choose to add
# one some day. This value can come from our PollingFuture
# superclass and was introduced in
# https://github.com/googleapis/python-api-core/pull/462.
if isinstance(timeout, (float, int)):
remaining_timeout = timeout
else:
# Note: we may need to handle _DEFAULT_VALUE as a separate
# case someday, but even then the best we can do for queries
# is 72+ hours for hyperpareter tuning jobs:
chalmerlowe marked this conversation as resolved.
Show resolved Hide resolved
# https://cloud.google.com/bigquery/quotas#query_jobs
#
# The timeout for a multi-statement query is 24+ hours. See:
# https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
remaining_timeout = None

if remaining_timeout is None:
# Since is_job_done() calls jobs.getQueryResults, which is a
Expand Down
28 changes: 24 additions & 4 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,25 @@

_DEFAULT_RETRY_DEADLINE = 10.0 * 60.0 # 10 minutes

# Allow for a few retries after the API request times out. This is relevant for
# rateLimitExceeded errors, which can be raised either by the Google load
# balancer or the BigQuery job server.
_DEFAULT_JOB_DEADLINE = 4.0 * _DEFAULT_RETRY_DEADLINE
# Ambiguous errors (e.g. internalError, backendError, rateLimitExceeded) retry
# until the full `_DEFAULT_RETRY_DEADLINE`. This is because the
# `jobs.getQueryResults` REST API translates a job failure into an HTTP error.
#
# TODO(https://github.com/googleapis/python-bigquery/issues/1903): Investigate
# if we can fail early for ambiguous errors in `QueryJob.result()`'s call to
# the `jobs.getQueryResult` API.
#
# We need `_DEFAULT_JOB_DEADLINE` to be some multiple of
# `_DEFAULT_RETRY_DEADLINE` to allow for a few retries after the retry
# timeout is reached.
#
# Note: This multiple should actually be a multiple of
# (2 * _DEFAULT_RETRY_DEADLINE). After an ambiguous exception, the first
# call from `job_retry()` refreshes the job state without actually restarting
# the query. The second `job_retry()` actually restarts the query. For a more
# detailed explanation, see the comments where we set `restart_query_job = True`
# in `QueryJob.result()`'s inner `is_job_done()` function.
_DEFAULT_JOB_DEADLINE = 2.0 * (2.0 * _DEFAULT_RETRY_DEADLINE)


def _should_retry(exc):
Expand All @@ -66,6 +81,11 @@ def _should_retry(exc):
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""

# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We
# briefly had a default timeout, but even setting it at more than twice the
# theoretical server-side default timeout of 2 minutes was not enough for
# complex queries. See:
# https://github.com/googleapis/python-bigquery/issues/970#issuecomment-921934647
DEFAULT_TIMEOUT = None
"""The default API timeout.

Expand Down
91 changes: 87 additions & 4 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,12 @@ def test_search_stats(self):
assert isinstance(job.search_stats, SearchStats)
assert job.search_stats.mode == "INDEX_USAGE_MODE_UNSPECIFIED"

def test_result(self):
def test_result_reloads_job_state_until_done(self):
"""Verify that result() doesn't return until state == 'DONE'.

This test verifies correctness for a possible sequence of API responses
that might cause internal customer issue b/332850329.
"""
from google.cloud.bigquery.table import RowIterator

query_resource = {
Expand Down Expand Up @@ -877,11 +882,53 @@ def test_result(self):
"rows": [{"f": [{"v": "abc"}]}],
}
conn = make_connection(
# QueryJob.result() makes a pair of jobs.get & jobs.getQueryResults
# REST API calls each iteration to determine if the job has finished
# or not.
#
# jobs.get (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
# is necessary to make sure the job has really finished via
# `Job.status.state == "DONE"` and to get necessary properties for
# `RowIterator` like the destination table.
#
# jobs.getQueryResults
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults)
# with maxResults == 0 is technically optional,
# but it hangs up to 10 seconds until the job has finished. This
# makes sure we can know when the query has finished as close as
# possible to when the query finishes. It also gets properties
# necessary for `RowIterator` that isn't available on the job
# resource such as the schema
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults#body.GetQueryResultsResponse.FIELDS.schema)
# of the results.
job_resource,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I am tracking the relationship between the make connection inputs versus the assert_has_calls checks.

Can you explain how these tests are supposed to work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make_connection is a convention in google-cloud-bigquery unit tests that actually predates our use of the "mock" package. It mocks out the responses to REST API calls, previously with a fake implementation of our "Connection" class from the _http module and now with a true mock object. For every quest that our test makes, there should be a corresponding response. As with Mock.side_effect, any exceptions in this list will be raised, instead.

I'm guessing your question also relates to "Why this particular set of requests / responses?". I've added some comments explaining why we're expecting this sequence of API calls. I've also updated this test to more explicitly check for a possible cause of customer issue b/332850329.

query_resource,
# The query wasn't finished in the last call to jobs.get, so try
# again with a call to both jobs.get & jobs.getQueryResults.
job_resource,
query_resource_done,
# Even though, the previous jobs.getQueryResults response says
# the job is complete, we haven't downloaded the full job status
# yet.
#
# Important: per internal issue 332850329, this reponse has
# `Job.status.state = "RUNNING"`. This ensures we are protected
# against possible eventual consistency issues where
# `jobs.getQueryResults` says jobComplete == True, but our next
# call to `jobs.get` still doesn't have
# `Job.status.state == "DONE"`.
job_resource,
# Try again until `Job.status.state == "DONE"`.
#
# Note: the call to `jobs.getQueryResults` is missing here as
# an optimization. We already received a "completed" response, so
# we won't learn anything new by calling that API again.
job_resource,
job_resource_done,
# When we iterate over the `RowIterator` we return from
# `QueryJob.result()`, we make additional calls to
# `jobs.getQueryResults` but this time allowing the actual rows
# to be returned as well.
query_page_resource,
)
client = _make_client(self.PROJECT, connection=conn)
Expand Down Expand Up @@ -925,13 +972,30 @@ def test_result(self):
},
timeout=None,
)
# Ensure that we actually made the expected API calls in the sequence
# we thought above at the make_connection() call above.
#
# Note: The responses from jobs.get and jobs.getQueryResults can be
# deceptively similar, so this check ensures we actually made the
# requests we expected.
conn.api_request.assert_has_calls(
[
# jobs.get & jobs.getQueryResults because the job just started.
reload_call,
query_results_call,
# jobs.get & jobs.getQueryResults because the query is still
# running.
reload_call,
query_results_call,
# We got a jobComplete response from the most recent call to
# jobs.getQueryResults, so now call jobs.get until we get
# `Jobs.status.state == "DONE"`. This tests a fix for internal
# issue b/332850329.
reload_call,
reload_call,
reload_call,
# jobs.getQueryResults without `maxResults` set to download
# the rows as we iterate over the `RowIterator`.
query_page_call,
]
)
Expand Down Expand Up @@ -1154,7 +1218,7 @@ def test_result_with_max_results(self):
query_page_request[1]["query_params"]["maxResults"], max_results
)

def test_result_w_retry(self):
def test_result_w_custom_retry(self):
from google.cloud.bigquery.table import RowIterator

query_resource = {
Expand All @@ -1178,16 +1242,24 @@ def test_result_w_retry(self):
}

connection = make_connection(
# Also, for each API request, raise an exception that we know can
# be retried. Because of this, for each iteration we do:
# jobs.get (x2) & jobs.getQueryResults (x2)
exceptions.NotFound("not normally retriable"),
job_resource,
exceptions.NotFound("not normally retriable"),
query_resource,
# Query still not done, repeat both.
exceptions.NotFound("not normally retriable"),
job_resource,
exceptions.NotFound("not normally retriable"),
query_resource_done,
query_resource,
exceptions.NotFound("not normally retriable"),
# Query still not done, repeat both.
job_resource_done,
exceptions.NotFound("not normally retriable"),
query_resource_done,
# Query finished!
)
client = _make_client(self.PROJECT, connection=connection)
job = self._get_target_class().from_api_repr(job_resource, client)
Expand All @@ -1207,7 +1279,10 @@ def test_result_w_retry(self):
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0, "location": "asia-northeast1"},
timeout=None,
# TODO(tswast): Why do we end up setting timeout to
# google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT in
# some cases but not others?
timeout=mock.ANY,
)
reload_call = mock.call(
method="GET",
Expand All @@ -1218,16 +1293,24 @@ def test_result_w_retry(self):

connection.api_request.assert_has_calls(
[
# See make_connection() call above for explanation of the
# expected API calls.
#
# Query not done.
reload_call,
chalmerlowe marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here. Can I get some clarity on what we are doing and looking for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some explanation here as well as above in the make_connection() call.

reload_call,
query_results_call,
query_results_call,
# Query still not done.
reload_call,
reload_call,
query_results_call,
query_results_call,
# Query done!
reload_call,
reload_call,
query_results_call,
query_results_call,
]
)

Expand Down