Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid unnecessary API call in QueryJob.result() when job is already finished #1900

Merged
merged 14 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
fix most unit tests
  • Loading branch information
tswast committed Apr 12, 2024
commit 07839b5b1c742208911519113b5910d536990ed6
24 changes: 20 additions & 4 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,12 @@ def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

# We could have been given a generic object() sentinel to represent a
# default timeout.
transport_timeout = (
transport_timeout if isinstance(timeout, (int, float)) else None
)

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
Expand Down Expand Up @@ -1576,7 +1582,12 @@ def is_job_done():
self._retry_do_query = retry_do_query
self._job_retry = job_retry

if self.done():
# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
# If it's already failed, we might as well stop.
if self.exception() is not None:
# Only try to restart the query job if the job failed for
Expand All @@ -1590,16 +1601,21 @@ def is_job_done():
# it's already been fetched, e.g. from jobs.query first
# page of results.
if (
self._query_results is not None
and self._query_results.complete
self._query_results is None
or not self._query_results.complete
):
return True
self._reload_query_results(retry=retry, timeout=timeout)
return True

# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, timeout=timeout)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh oh, if jobs.getQueryResults fails because the job failed it can throw an exception but restart_query_job will still be False.

But we don't want restart_query_job = True because sometimes this can raise an ambiguous exception such as quota exceeded, where we don't know if it's the job quota and it's a failed job or at a higher level (Google Frontend - GFE) where the job might actually still be running and/or succeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the worst way to fail, but it'd be nice to do the jobs.get call above in case of an exception to get a chance at retrying this job if the job failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #1903 to track improvements to ambiguous errors. 12fa9fb fixes an issue where we weren't actually retrying after an ambiguous failure even though we thought we were.


# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
# it's not already DONE.
return False

if retry_do_query is not None and job_retry is not None:
Expand Down
64 changes: 30 additions & 34 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,12 @@ def test_result(self):
"rows": [{"f": [{"v": "abc"}]}],
}
conn = make_connection(
query_resource, query_resource_done, job_resource_done, query_page_resource
job_resource,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I am tracking the relationship between the make connection inputs versus the assert_has_calls checks.

Can you explain how these tests are supposed to work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make_connection is a convention in google-cloud-bigquery unit tests that actually predates our use of the "mock" package. It mocks out the responses to REST API calls, previously with a fake implementation of our "Connection" class from the _http module and now with a true mock object. For every quest that our test makes, there should be a corresponding response. As with Mock.side_effect, any exceptions in this list will be raised, instead.

I'm guessing your question also relates to "Why this particular set of requests / responses?". I've added some comments explaining why we're expecting this sequence of API calls. I've also updated this test to more explicitly check for a possible cause of customer issue b/332850329.

query_resource,
job_resource,
query_resource_done,
job_resource_done,
query_page_resource,
)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)
Expand Down Expand Up @@ -1014,7 +1019,14 @@ def test_result(self):
timeout=None,
)
conn.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call, query_page_call]
[
reload_call,
query_results_call,
reload_call,
query_results_call,
reload_call,
query_page_call,
]
)

def test_result_dry_run(self):
Expand Down Expand Up @@ -1254,9 +1266,13 @@ def test_result_w_retry(self):
}

connection = make_connection(
exceptions.NotFound("not normally retriable"),
job_resource,
exceptions.NotFound("not normally retriable"),
query_resource,
exceptions.NotFound("not normally retriable"),
job_resource,
exceptions.NotFound("not normally retriable"),
query_resource_done,
exceptions.NotFound("not normally retriable"),
job_resource_done,
Expand Down Expand Up @@ -1289,7 +1305,18 @@ def test_result_w_retry(self):
)

connection.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call]
[
reload_call,
chalmerlowe marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here. Can I get some clarity on what we are doing and looking for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some explanation here as well as above in the make_connection() call.

reload_call,
query_results_call,
query_results_call,
reload_call,
reload_call,
query_results_call,
query_results_call,
reload_call,
reload_call,
]
)

def test_result_w_empty_schema(self):
Expand All @@ -1316,37 +1343,6 @@ def test_result_w_empty_schema(self):
self.assertEqual(result.location, "asia-northeast1")
self.assertEqual(result.query_id, "xyz-abc")

def test_result_invokes_begins(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of #967 released in google-cloud-bigquery 3.0.0, the _begin method is no longer used for query jobs.

begun_resource = self._make_resource()
incomplete_resource = {
"jobComplete": False,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
}
query_resource = copy.deepcopy(incomplete_resource)
query_resource["jobComplete"] = True
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = make_connection(
begun_resource,
incomplete_resource,
query_resource,
done_resource,
query_resource,
)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

job.result()

self.assertEqual(len(connection.api_request.call_args_list), 4)
begin_request = connection.api_request.call_args_list[0]
query_request = connection.api_request.call_args_list[2]
reload_request = connection.api_request.call_args_list[3]
self.assertEqual(begin_request[1]["method"], "POST")
self.assertEqual(query_request[1]["method"], "GET")
self.assertEqual(reload_request[1]["method"], "GET")

def test_result_w_timeout(self):
import google.cloud.bigquery.client

Expand Down
14 changes: 13 additions & 1 deletion tests/unit/test__job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,15 @@ def test_query_and_wait_incomplete_query():
"jobComplete": False,
},
# jobs.get
{
"jobReference": {
"projectId": "response-project",
"jobId": "response-job-id",
"location": "response-location",
},
"status": {"state": "RUNNING"},
},
# jobs.getQueryResults with max_results=0
{
"jobReference": {
"projectId": "response-project",
Expand All @@ -962,15 +971,18 @@ def test_query_and_wait_incomplete_query():
],
},
},
# jobs.getQueryResults with max_results=0
# jobs.get
{
"jobReference": {
"projectId": "response-project",
"jobId": "response-job-id",
"location": "response-location",
},
"status": {"state": "DONE"},
},
# jobs.getQueryResults
# Note: No more jobs.getQueryResults with max_results=0 because the
# previous call to jobs.getQueryResults returned with jobComplete=True.
{
"rows": [
{"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]},
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_job_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ def api_request(method, path, query_params=None, data=None, **kw):
with pytest.raises(google.api_core.exceptions.RetryError):
job.result()

# We never got a successful job, so the job id never changed:
assert job.job_id == orig_job_id
# We retried the job at least once, so we should have generated a new job ID.
assert job.job_id != orig_job_id

# We failed because we couldn't succeed after 120 seconds.
# But we can try again:
Expand Down