diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index 602a49eba..290439394 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -258,15 +258,16 @@ def _to_query_job( errors = query_response["errors"] query_job._properties["status"]["errors"] = errors - # Transform job state so that QueryJob doesn't try to restart the query. + # Avoid an extra call to `getQueryResults` if the query has finished. job_complete = query_response.get("jobComplete") if job_complete: - query_job._properties["status"]["state"] = "DONE" query_job._query_results = google.cloud.bigquery.query._QueryResults( query_response ) - else: - query_job._properties["status"]["state"] = "PENDING" + + # We want job.result() to refresh the job state, so the conversion is + # always "PENDING", even if the job is finished. + query_job._properties["status"]["state"] = "PENDING" return query_job diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index e92e9cb9e..7436b6013 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -17,11 +17,11 @@ import concurrent.futures import copy import re +import time import typing from typing import Any, Dict, Iterable, List, Optional, Union from google.api_core import exceptions -from google.api_core.future import polling as polling_future from google.api_core import retry as retries import requests @@ -1383,7 +1383,7 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): def _reload_query_results( self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: Optional[float] = None ): - """Refresh the cached query results. + """Refresh the cached query results unless already cached and complete. Args: retry (Optional[google.api_core.retry.Retry]): @@ -1392,6 +1392,8 @@ def _reload_query_results( The number of seconds to wait for the underlying HTTP transport before using ``retry``. """ + # Optimization: avoid a call to jobs.getQueryResults if it's already + # been fetched, e.g. from jobs.query first page of results. if self._query_results and self._query_results.complete: return @@ -1430,40 +1432,6 @@ def _reload_query_results( timeout=transport_timeout, ) - def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None): - """Check if the query has finished running and raise if it's not. - - If the query has finished, also reload the job itself. - """ - # If an explicit timeout is not given, fall back to the transport timeout - # stored in _blocking_poll() in the process of polling for job completion. - transport_timeout = timeout if timeout is not None else self._transport_timeout - - try: - self._reload_query_results(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError as exc: - # Reloading also updates error details on self, thus no need for an - # explicit self.set_exception() call if reloading succeeds. - try: - self.reload(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError: - # Use the query results reload exception, as it generally contains - # much more useful error information. - self.set_exception(exc) - finally: - return - - # Only reload the job once we know the query is complete. - # This will ensure that fields such as the destination table are - # correctly populated. - if not self._query_results.complete: - raise polling_future._OperationNotComplete() - else: - try: - self.reload(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError as exc: - self.set_exception(exc) - def result( # type: ignore # (incompatible with supertype) self, page_size: Optional[int] = None, @@ -1528,6 +1496,10 @@ def result( # type: ignore # (incompatible with supertype) If Non-``None`` and non-default ``job_retry`` is provided and the job is not retryable. """ + # Note: Since waiting for a query job to finish is more complex than + # refreshing the job state in a loop, we avoid calling the superclass + # in this method. + if self.dry_run: return _EmptyRowIterator( project=self.project, @@ -1548,46 +1520,124 @@ def result( # type: ignore # (incompatible with supertype) " provided to the query that created this job." ) - first = True + restart_query_job = False + + def is_job_done(): + nonlocal restart_query_job - def do_get_result(): - nonlocal first + if restart_query_job: + restart_query_job = False - if first: - first = False - else: + # The original job has failed. Create a new one. + # # Note that we won't get here if retry_do_query is # None, because we won't use a retry. - - # The orinal job is failed. Create a new one. job = retry_do_query() - # If it's already failed, we might as well stop: - if job.done() and job.exception() is not None: - raise job.exception() - # Become the new job: self.__dict__.clear() self.__dict__.update(job.__dict__) - # This shouldn't be necessary, because once we have a good - # job, it should stay good,and we shouldn't have to retry. - # But let's be paranoid. :) + # It's possible the job fails again and we'll have to + # retry that too. self._retry_do_query = retry_do_query self._job_retry = job_retry - super(QueryJob, self).result(retry=retry, timeout=timeout) - - # Since the job could already be "done" (e.g. got a finished job - # via client.get_job), the superclass call to done() might not - # set the self._query_results cache. - if self._query_results is None or not self._query_results.complete: - self._reload_query_results(retry=retry, timeout=timeout) + # Refresh the job status with jobs.get because some of the + # exceptions thrown by jobs.getQueryResults like timeout and + # rateLimitExceeded errors are ambiguous. We want to know if + # the query job failed and not just the call to + # jobs.getQueryResults. + if self.done(retry=retry, timeout=timeout): + # If it's already failed, we might as well stop. + job_failed_exception = self.exception() + if job_failed_exception is not None: + # Only try to restart the query job if the job failed for + # a retriable reason. For example, don't restart the query + # if the call to reload the job metadata within self.done() + # timed out. + # + # The `restart_query_job` must only be called after a + # successful call to the `jobs.get` REST API and we + # determine that the job has failed. + # + # The `jobs.get` REST API + # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get) + # is called via `self.done()` which calls + # `self.reload()`. + # + # To determine if the job failed, the `self.exception()` + # is set from `self.reload()` via + # `self._set_properties()`, which translates the + # `Job.status.errorResult` field + # (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result) + # into an exception that can be processed by the + # `job_retry` predicate. + restart_query_job = True + raise job_failed_exception + else: + # Make sure that the _query_results are cached so we + # can return a complete RowIterator. + # + # Note: As an optimization, _reload_query_results + # doesn't make any API calls if the query results are + # already cached and have jobComplete=True in the + # response from the REST API. This ensures we aren't + # making any extra API calls if the previous loop + # iteration fetched the finished job. + self._reload_query_results(retry=retry, timeout=timeout) + return True + + # Call jobs.getQueryResults with max results set to 0 just to + # wait for the query to finish. Unlike most methods, + # jobs.getQueryResults hangs as long as it can to ensure we + # know when the query has finished as soon as possible. + self._reload_query_results(retry=retry, timeout=timeout) + + # Even if the query is finished now according to + # jobs.getQueryResults, we'll want to reload the job status if + # it's not already DONE. + return False if retry_do_query is not None and job_retry is not None: - do_get_result = job_retry(do_get_result) - - do_get_result() + is_job_done = job_retry(is_job_done) + + # timeout can be a number of seconds, `None`, or a + # `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE` + # sentinel object indicating a default timeout if we choose to add + # one some day. This value can come from our PollingFuture + # superclass and was introduced in + # https://github.com/googleapis/python-api-core/pull/462. + if isinstance(timeout, (float, int)): + remaining_timeout = timeout + else: + # Note: we may need to handle _DEFAULT_VALUE as a separate + # case someday, but even then the best we can do for queries + # is 72+ hours for hyperparameter tuning jobs: + # https://cloud.google.com/bigquery/quotas#query_jobs + # + # The timeout for a multi-statement query is 24+ hours. See: + # https://cloud.google.com/bigquery/quotas#multi_statement_query_limits + remaining_timeout = None + + if remaining_timeout is None: + # Since is_job_done() calls jobs.getQueryResults, which is a + # long-running API, don't delay the next request at all. + while not is_job_done(): + pass + else: + # Use a monotonic clock since we don't actually care about + # daylight savings or similar, just the elapsed time. + previous_time = time.monotonic() + + while not is_job_done(): + current_time = time.monotonic() + elapsed_time = current_time - previous_time + remaining_timeout = remaining_timeout - elapsed_time + previous_time = current_time + + if remaining_timeout < 0: + raise concurrent.futures.TimeoutError() except exceptions.GoogleAPICallError as exc: exc.message = _EXCEPTION_FOOTER_TEMPLATE.format( diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 01b127972..c9898287f 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -36,10 +36,25 @@ _DEFAULT_RETRY_DEADLINE = 10.0 * 60.0 # 10 minutes -# Allow for a few retries after the API request times out. This relevant for -# rateLimitExceeded errors, which can be raised either by the Google load -# balancer or the BigQuery job server. -_DEFAULT_JOB_DEADLINE = 3.0 * _DEFAULT_RETRY_DEADLINE +# Ambiguous errors (e.g. internalError, backendError, rateLimitExceeded) retry +# until the full `_DEFAULT_RETRY_DEADLINE`. This is because the +# `jobs.getQueryResults` REST API translates a job failure into an HTTP error. +# +# TODO(https://github.com/googleapis/python-bigquery/issues/1903): Investigate +# if we can fail early for ambiguous errors in `QueryJob.result()`'s call to +# the `jobs.getQueryResult` API. +# +# We need `_DEFAULT_JOB_DEADLINE` to be some multiple of +# `_DEFAULT_RETRY_DEADLINE` to allow for a few retries after the retry +# timeout is reached. +# +# Note: This multiple should actually be a multiple of +# (2 * _DEFAULT_RETRY_DEADLINE). After an ambiguous exception, the first +# call from `job_retry()` refreshes the job state without actually restarting +# the query. The second `job_retry()` actually restarts the query. For a more +# detailed explanation, see the comments where we set `restart_query_job = True` +# in `QueryJob.result()`'s inner `is_job_done()` function. +_DEFAULT_JOB_DEADLINE = 2.0 * (2.0 * _DEFAULT_RETRY_DEADLINE) def _should_retry(exc): @@ -66,6 +81,11 @@ def _should_retry(exc): pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. """ +# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We +# briefly had a default timeout, but even setting it at more than twice the +# theoretical server-side default timeout of 2 minutes was not enough for +# complex queries. See: +# https://github.com/googleapis/python-bigquery/issues/970#issuecomment-921934647 DEFAULT_TIMEOUT = None """The default API timeout. @@ -73,10 +93,32 @@ def _should_retry(exc): deadline on the retry object. """ -job_retry_reasons = "rateLimitExceeded", "backendError", "jobRateLimitExceeded" +job_retry_reasons = ( + "rateLimitExceeded", + "backendError", + "internalError", + "jobRateLimitExceeded", +) def _job_should_retry(exc): + # Sometimes we have ambiguous errors, such as 'backendError' which could + # be due to an API problem or a job problem. For these, make sure we retry + # our is_job_done() function. + # + # Note: This won't restart the job unless we know for sure it's because of + # the job status and set restart_query_job = True in that loop. This means + # that we might end up calling this predicate twice for the same job + # but from different paths: (1) from jobs.getQueryResults RetryError and + # (2) from translating the job error from the body of a jobs.get response. + # + # Note: If we start retrying job types other than queries where we don't + # call the problematic getQueryResults API to check the status, we need + # to provide a different predicate, as there shouldn't be ambiguous + # errors in those cases. + if isinstance(exc, exceptions.RetryError): + exc = exc.cause + if not hasattr(exc, "errors") or len(exc.errors) == 0: return False diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 37ac7ba5e..0fee053e3 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -13,6 +13,7 @@ # limitations under the License. import concurrent +import concurrent.futures import copy import http import textwrap @@ -371,100 +372,6 @@ def test_cancelled(self): self.assertTrue(job.cancelled()) - def test__done_or_raise_w_timeout(self): - client = _make_client(project=self.PROJECT) - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - - with mock.patch.object( - client, "_get_query_results" - ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: - job._done_or_raise(timeout=42) - - fake_get_results.assert_called_once() - call_args = fake_get_results.call_args[0][1] - self.assertEqual(call_args.timeout, 600.0) - - call_args = fake_reload.call_args[1] - self.assertEqual(call_args["timeout"], 42) - - def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self): - client = _make_client(project=self.PROJECT) - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - job._done_timeout = 8.8 - - with mock.patch.object( - client, "_get_query_results" - ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: - job._done_or_raise(timeout=5.5) - - # The expected timeout used is simply the given timeout, as the latter - # is shorter than the job's internal done timeout. - expected_timeout = 5.5 - - fake_get_results.assert_called_once() - call_args = fake_get_results.call_args[0][1] - self.assertAlmostEqual(call_args.timeout, 600.0) - - call_args = fake_reload.call_args - self.assertAlmostEqual(call_args[1].get("timeout"), expected_timeout) - - def test__done_or_raise_w_query_results_error_reload_ok(self): - client = _make_client(project=self.PROJECT) - bad_request_error = exceptions.BadRequest("Error in query") - client._get_query_results = mock.Mock(side_effect=bad_request_error) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - job._exception = None - - def fake_reload(self, *args, **kwargs): - self._properties["status"]["state"] = "DONE" - self.set_exception(copy.copy(bad_request_error)) - - fake_reload_method = types.MethodType(fake_reload, job) - - with mock.patch.object(job, "reload", new=fake_reload_method): - job._done_or_raise() - - assert isinstance(job._exception, exceptions.BadRequest) - - def test__done_or_raise_w_query_results_error_reload_error(self): - client = _make_client(project=self.PROJECT) - bad_request_error = exceptions.BadRequest("Error in query") - client._get_query_results = mock.Mock(side_effect=bad_request_error) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - reload_error = exceptions.DataLoss("Oops, sorry!") - job.reload = mock.Mock(side_effect=reload_error) - job._exception = None - - job._done_or_raise() - - assert job._exception is bad_request_error - - def test__done_or_raise_w_job_query_results_ok_reload_error(self): - client = _make_client(project=self.PROJECT) - query_results = google.cloud.bigquery.query._QueryResults( - properties={ - "jobComplete": True, - "jobReference": {"projectId": self.PROJECT, "jobId": "12345"}, - } - ) - client._get_query_results = mock.Mock(return_value=query_results) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError) - job.reload = mock.Mock(side_effect=retry_error) - job._exception = None - - job._done_or_raise() - - assert job._exception is retry_error - def test_query_plan(self): from google.cloud._helpers import _RFC3339_MICROS from google.cloud.bigquery.job import QueryPlanEntry @@ -933,7 +840,12 @@ def test_search_stats(self): assert isinstance(job.search_stats, SearchStats) assert job.search_stats.mode == "INDEX_USAGE_MODE_UNSPECIFIED" - def test_result(self): + def test_result_reloads_job_state_until_done(self): + """Verify that result() doesn't return until state == 'DONE'. + + This test verifies correctness for a possible sequence of API responses + that might cause internal customer issue b/332850329. + """ from google.cloud.bigquery.table import RowIterator query_resource = { @@ -970,7 +882,54 @@ def test_result(self): "rows": [{"f": [{"v": "abc"}]}], } conn = make_connection( - query_resource, query_resource_done, job_resource_done, query_page_resource + # QueryJob.result() makes a pair of jobs.get & jobs.getQueryResults + # REST API calls each iteration to determine if the job has finished + # or not. + # + # jobs.get (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get) + # is necessary to make sure the job has really finished via + # `Job.status.state == "DONE"` and to get necessary properties for + # `RowIterator` like the destination table. + # + # jobs.getQueryResults + # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults) + # with maxResults == 0 is technically optional, + # but it hangs up to 10 seconds until the job has finished. This + # makes sure we can know when the query has finished as close as + # possible to when the query finishes. It also gets properties + # necessary for `RowIterator` that isn't available on the job + # resource such as the schema + # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults#body.GetQueryResultsResponse.FIELDS.schema) + # of the results. + job_resource, + query_resource, + # The query wasn't finished in the last call to jobs.get, so try + # again with a call to both jobs.get & jobs.getQueryResults. + job_resource, + query_resource_done, + # Even though, the previous jobs.getQueryResults response says + # the job is complete, we haven't downloaded the full job status + # yet. + # + # Important: per internal issue 332850329, this reponse has + # `Job.status.state = "RUNNING"`. This ensures we are protected + # against possible eventual consistency issues where + # `jobs.getQueryResults` says jobComplete == True, but our next + # call to `jobs.get` still doesn't have + # `Job.status.state == "DONE"`. + job_resource, + # Try again until `Job.status.state == "DONE"`. + # + # Note: the call to `jobs.getQueryResults` is missing here as + # an optimization. We already received a "completed" response, so + # we won't learn anything new by calling that API again. + job_resource, + job_resource_done, + # When we iterate over the `RowIterator` we return from + # `QueryJob.result()`, we make additional calls to + # `jobs.getQueryResults` but this time allowing the actual rows + # to be returned as well. + query_page_resource, ) client = _make_client(self.PROJECT, connection=conn) job = self._get_target_class().from_api_repr(job_resource, client) @@ -1013,8 +972,32 @@ def test_result(self): }, timeout=None, ) + # Ensure that we actually made the expected API calls in the sequence + # we thought above at the make_connection() call above. + # + # Note: The responses from jobs.get and jobs.getQueryResults can be + # deceptively similar, so this check ensures we actually made the + # requests we expected. conn.api_request.assert_has_calls( - [query_results_call, query_results_call, reload_call, query_page_call] + [ + # jobs.get & jobs.getQueryResults because the job just started. + reload_call, + query_results_call, + # jobs.get & jobs.getQueryResults because the query is still + # running. + reload_call, + query_results_call, + # We got a jobComplete response from the most recent call to + # jobs.getQueryResults, so now call jobs.get until we get + # `Jobs.status.state == "DONE"`. This tests a fix for internal + # issue b/332850329. + reload_call, + reload_call, + reload_call, + # jobs.getQueryResults without `maxResults` set to download + # the rows as we iterate over the `RowIterator`. + query_page_call, + ] ) def test_result_dry_run(self): @@ -1069,7 +1052,7 @@ def test_result_with_done_job_calls_get_query_results(self): method="GET", path=query_results_path, query_params={"maxResults": 0, "location": "EU"}, - timeout=None, + timeout=google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT, ) query_results_page_call = mock.call( method="GET", @@ -1107,7 +1090,10 @@ def test_result_with_done_jobs_query_response_doesnt_call_get_query_results(self request_config=None, query_response=query_resource_done, ) - assert job.state == "DONE" + + # We want job.result() to refresh the job state, so the conversion is + # always "PENDING", even if the job is finished. + assert job.state == "PENDING" result = job.result() @@ -1156,7 +1142,9 @@ def test_result_with_done_jobs_query_response_and_page_size_invalidates_cache(se request_config=None, query_response=query_resource_done, ) - assert job.state == "DONE" + # We want job.result() to refresh the job state, so the conversion is + # always "PENDING", even if the job is finished. + assert job.state == "PENDING" # Act result = job.result(page_size=3) @@ -1230,7 +1218,7 @@ def test_result_with_max_results(self): query_page_request[1]["query_params"]["maxResults"], max_results ) - def test_result_w_retry(self): + def test_result_w_custom_retry(self): from google.cloud.bigquery.table import RowIterator query_resource = { @@ -1254,12 +1242,24 @@ def test_result_w_retry(self): } connection = make_connection( + # Also, for each API request, raise an exception that we know can + # be retried. Because of this, for each iteration we do: + # jobs.get (x2) & jobs.getQueryResults (x2) + exceptions.NotFound("not normally retriable"), + job_resource, exceptions.NotFound("not normally retriable"), query_resource, + # Query still not done, repeat both. exceptions.NotFound("not normally retriable"), - query_resource_done, + job_resource, exceptions.NotFound("not normally retriable"), + query_resource, + exceptions.NotFound("not normally retriable"), + # Query still not done, repeat both. job_resource_done, + exceptions.NotFound("not normally retriable"), + query_resource_done, + # Query finished! ) client = _make_client(self.PROJECT, connection=connection) job = self._get_target_class().from_api_repr(job_resource, client) @@ -1279,7 +1279,10 @@ def test_result_w_retry(self): method="GET", path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", query_params={"maxResults": 0, "location": "asia-northeast1"}, - timeout=None, + # TODO(tswast): Why do we end up setting timeout to + # google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT in + # some cases but not others? + timeout=mock.ANY, ) reload_call = mock.call( method="GET", @@ -1289,7 +1292,26 @@ def test_result_w_retry(self): ) connection.api_request.assert_has_calls( - [query_results_call, query_results_call, reload_call] + [ + # See make_connection() call above for explanation of the + # expected API calls. + # + # Query not done. + reload_call, + reload_call, + query_results_call, + query_results_call, + # Query still not done. + reload_call, + reload_call, + query_results_call, + query_results_call, + # Query done! + reload_call, + reload_call, + query_results_call, + query_results_call, + ] ) def test_result_w_empty_schema(self): @@ -1316,41 +1338,60 @@ def test_result_w_empty_schema(self): self.assertEqual(result.location, "asia-northeast1") self.assertEqual(result.query_id, "xyz-abc") - def test_result_invokes_begins(self): + def test_result_w_timeout_doesnt_raise(self): + import google.cloud.bigquery.client + begun_resource = self._make_resource() - incomplete_resource = { - "jobComplete": False, + query_resource = { + "jobComplete": True, "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, } - query_resource = copy.deepcopy(incomplete_resource) - query_resource["jobComplete"] = True done_resource = copy.deepcopy(begun_resource) done_resource["status"] = {"state": "DONE"} - connection = make_connection( - begun_resource, - incomplete_resource, - query_resource, - done_resource, - query_resource, - ) + connection = make_connection(begun_resource, query_resource, done_resource) client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) + job._properties["jobReference"]["location"] = "US" - job.result() + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False): + job.result( + # Test that fractional seconds are supported, but use a timeout + # that is representable as a floating point without rounding + # errors since it can be represented exactly in base 2. In this + # case 1.125 is 9 / 8, which is a fraction with a power of 2 in + # the denominator. + timeout=1.125, + ) - self.assertEqual(len(connection.api_request.call_args_list), 4) - begin_request = connection.api_request.call_args_list[0] - query_request = connection.api_request.call_args_list[2] - reload_request = connection.api_request.call_args_list[3] - self.assertEqual(begin_request[1]["method"], "POST") - self.assertEqual(query_request[1]["method"], "GET") - self.assertEqual(reload_request[1]["method"], "GET") + reload_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", + query_params={"location": "US"}, + timeout=1.125, + ) + get_query_results_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", + query_params={ + "maxResults": 0, + "location": "US", + }, + timeout=google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT, + ) + connection.api_request.assert_has_calls( + [ + reload_call, + get_query_results_call, + reload_call, + ] + ) - def test_result_w_timeout(self): + def test_result_w_timeout_raises_concurrent_futures_timeout(self): import google.cloud.bigquery.client begun_resource = self._make_resource() + begun_resource["jobReference"]["location"] = "US" query_resource = { "jobComplete": True, "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, @@ -1361,26 +1402,35 @@ def test_result_w_timeout(self): connection = make_connection(begun_resource, query_resource, done_resource) client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) + job._properties["jobReference"]["location"] = "US" - with freezegun.freeze_time("1970-01-01 00:00:00", tick=False): - job.result(timeout=1.0) - - self.assertEqual(len(connection.api_request.call_args_list), 3) - begin_request = connection.api_request.call_args_list[0] - query_request = connection.api_request.call_args_list[1] - reload_request = connection.api_request.call_args_list[2] - self.assertEqual(begin_request[1]["method"], "POST") - self.assertEqual(query_request[1]["method"], "GET") - self.assertEqual( - query_request[1]["path"], - "/projects/{}/queries/{}".format(self.PROJECT, self.JOB_ID), + with freezegun.freeze_time( + "1970-01-01 00:00:00", auto_tick_seconds=1.0 + ), self.assertRaises(concurrent.futures.TimeoutError): + job.result(timeout=1.125) + + reload_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", + query_params={"location": "US"}, + timeout=1.125, ) - self.assertEqual(query_request[1]["timeout"], 120) - self.assertEqual( - query_request[1]["timeout"], - google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT, + get_query_results_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", + query_params={ + "maxResults": 0, + "location": "US", + }, + timeout=google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT, + ) + connection.api_request.assert_has_calls( + [ + reload_call, + get_query_results_call, + # Timeout before we can reload with the final job state. + ] ) - self.assertEqual(reload_request[1]["method"], "GET") def test_result_w_page_size(self): # Arrange diff --git a/tests/unit/test__job_helpers.py b/tests/unit/test__job_helpers.py index 671b829f7..9f661dca7 100644 --- a/tests/unit/test__job_helpers.py +++ b/tests/unit/test__job_helpers.py @@ -246,7 +246,9 @@ def test__to_query_job_dry_run(): @pytest.mark.parametrize( ("completed", "expected_state"), ( - (True, "DONE"), + # Always pending so that we refresh the job state to get the + # destination table or job stats in case it's needed. + (True, "PENDING"), (False, "PENDING"), ), ) @@ -843,6 +845,7 @@ def test_query_and_wait_caches_completed_query_results_more_pages(): "jobId": "response-job-id", "location": "response-location", }, + "status": {"state": "DONE"}, }, { "rows": [ @@ -896,18 +899,10 @@ def test_query_and_wait_caches_completed_query_results_more_pages(): timeout=None, ) - # TODO(swast): Fetching job metadata isn't necessary in this case. - jobs_get_path = "/projects/response-project/jobs/response-job-id" - client._call_api.assert_any_call( - None, # retry - span_name="BigQuery.job.reload", - span_attributes={"path": jobs_get_path}, - job_ref=mock.ANY, - method="GET", - path=jobs_get_path, - query_params={"location": "response-location"}, - timeout=None, - ) + # Note: There is no get call to + # "/projects/response-project/jobs/response-job-id", because fetching job + # metadata isn't necessary in this case. The job already completed in + # jobs.query and we don't need the full job metadata in query_and_wait. # Fetch the remaining two pages. jobs_get_query_results_path = "/projects/response-project/queries/response-job-id" @@ -944,6 +939,7 @@ def test_query_and_wait_incomplete_query(): Client._list_rows_from_query_results, client ) client._call_api.side_effect = ( + # jobs.query { "jobReference": { "projectId": "response-project", @@ -952,6 +948,16 @@ def test_query_and_wait_incomplete_query(): }, "jobComplete": False, }, + # jobs.get + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "status": {"state": "RUNNING"}, + }, + # jobs.getQueryResults with max_results=0 { "jobReference": { "projectId": "response-project", @@ -968,13 +974,18 @@ def test_query_and_wait_incomplete_query(): ], }, }, + # jobs.get { "jobReference": { "projectId": "response-project", "jobId": "response-job-id", "location": "response-location", }, + "status": {"state": "DONE"}, }, + # jobs.getQueryResults + # Note: No more jobs.getQueryResults with max_results=0 because the + # previous call to jobs.getQueryResults returned with jobComplete=True. { "rows": [ {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, @@ -987,6 +998,7 @@ def test_query_and_wait_incomplete_query(): "totalRows": 2, "pageToken": "page-2", }, + # jobs.getQueryResults { "rows": [ {"f": [{"v": "Pearl Slaghoople"}, {"v": "53"}]}, diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index d7049c5ca..43ddae1dc 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -24,7 +24,7 @@ from google.cloud.bigquery.client import Client from google.cloud.bigquery import _job_helpers -from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY +import google.cloud.bigquery.retry from .helpers import make_connection @@ -126,6 +126,168 @@ def api_request(method, path, query_params=None, data=None, **kw): assert job.job_id == orig_job_id +def test_query_retry_with_default_retry_and_ambiguous_errors_only_retries_with_failed_job( + client, monkeypatch +): + """ + Some errors like 'rateLimitExceeded' can be ambiguous. Make sure we only + retry the job when we know for sure that the job has failed for a retriable + reason. We can only be sure after a "successful" call to jobs.get to fetch + the failed job status. + """ + job_counter = 0 + + def make_job_id(*args, **kwargs): + nonlocal job_counter + job_counter += 1 + return f"{job_counter}" + + monkeypatch.setattr(_job_helpers, "make_job_id", make_job_id) + + project = client.project + job_reference_1 = {"projectId": project, "jobId": "1", "location": "test-loc"} + job_reference_2 = {"projectId": project, "jobId": "2", "location": "test-loc"} + NUM_API_RETRIES = 2 + + # This error is modeled after a real customer exception in + # https://github.com/googleapis/python-bigquery/issues/707. + internal_error = google.api_core.exceptions.InternalServerError( + "Job failed just because...", + errors=[ + {"reason": "internalError"}, + ], + ) + responses = [ + # jobs.insert + {"jobReference": job_reference_1, "status": {"state": "PENDING"}}, + # jobs.get + {"jobReference": job_reference_1, "status": {"state": "RUNNING"}}, + # jobs.getQueryResults x2 + # + # Note: internalError is ambiguous in jobs.getQueryResults. The + # problem could be at the Google Frontend level or it could be because + # the job has failed due to some transient issues and the BigQuery + # REST API is translating the job failed status into failure HTTP + # codes. + # + # TODO(GH#1903): We shouldn't retry nearly this many times when we get + # ambiguous errors from jobs.getQueryResults. + # See: https://github.com/googleapis/python-bigquery/issues/1903 + internal_error, + internal_error, + # jobs.get -- the job has failed + { + "jobReference": job_reference_1, + "status": {"state": "DONE", "errorResult": {"reason": "internalError"}}, + }, + # jobs.insert + {"jobReference": job_reference_2, "status": {"state": "PENDING"}}, + # jobs.get + {"jobReference": job_reference_2, "status": {"state": "RUNNING"}}, + # jobs.getQueryResults + {"jobReference": job_reference_2, "jobComplete": True}, + # jobs.get + {"jobReference": job_reference_2, "status": {"state": "DONE"}}, + ] + + conn = client._connection = make_connection() + conn.api_request.side_effect = responses + + with freezegun.freeze_time( + # Note: because of exponential backoff and a bit of jitter, + # NUM_API_RETRIES will get less accurate the greater the value. + # We add 1 because we know there will be at least some additional + # calls to fetch the time / sleep before the retry deadline is hit. + auto_tick_seconds=( + google.cloud.bigquery.retry._DEFAULT_RETRY_DEADLINE / NUM_API_RETRIES + ) + + 1, + ): + job = client.query("select 1") + job.result() + + conn.api_request.assert_has_calls( + [ + # jobs.insert + mock.call( + method="POST", + path="/projects/PROJECT/jobs", + data={ + "jobReference": {"jobId": "1", "projectId": "PROJECT"}, + "configuration": { + "query": {"useLegacySql": False, "query": "select 1"} + }, + }, + timeout=None, + ), + # jobs.get + mock.call( + method="GET", + path="/projects/PROJECT/jobs/1", + query_params={"location": "test-loc"}, + timeout=None, + ), + # jobs.getQueryResults x2 + mock.call( + method="GET", + path="/projects/PROJECT/queries/1", + query_params={"maxResults": 0, "location": "test-loc"}, + timeout=None, + ), + mock.call( + method="GET", + path="/projects/PROJECT/queries/1", + query_params={"maxResults": 0, "location": "test-loc"}, + timeout=None, + ), + # jobs.get -- verify that the job has failed + mock.call( + method="GET", + path="/projects/PROJECT/jobs/1", + query_params={"location": "test-loc"}, + timeout=None, + ), + # jobs.insert + mock.call( + method="POST", + path="/projects/PROJECT/jobs", + data={ + "jobReference": { + # Make sure that we generated a new job ID. + "jobId": "2", + "projectId": "PROJECT", + }, + "configuration": { + "query": {"useLegacySql": False, "query": "select 1"} + }, + }, + timeout=None, + ), + # jobs.get + mock.call( + method="GET", + path="/projects/PROJECT/jobs/2", + query_params={"location": "test-loc"}, + timeout=None, + ), + # jobs.getQueryResults + mock.call( + method="GET", + path="/projects/PROJECT/queries/2", + query_params={"maxResults": 0, "location": "test-loc"}, + timeout=None, + ), + # jobs.get + mock.call( + method="GET", + path="/projects/PROJECT/jobs/2", + query_params={"location": "test-loc"}, + timeout=None, + ), + ] + ) + + # With job_retry_on_query, we're testing 4 scenarios: # - Pass None retry to `query`. # - Pass None retry to `result`. @@ -187,8 +349,8 @@ def api_request(method, path, query_params=None, data=None, **kw): with pytest.raises(google.api_core.exceptions.RetryError): job.result() - # We never got a successful job, so the job id never changed: - assert job.job_id == orig_job_id + # We retried the job at least once, so we should have generated a new job ID. + assert job.job_id != orig_job_id # We failed because we couldn't succeed after 120 seconds. # But we can try again: @@ -301,8 +463,8 @@ def test_query_and_wait_retries_job_for_DDL_queries(): job_config=None, page_size=None, max_results=None, - retry=DEFAULT_JOB_RETRY, - job_retry=DEFAULT_JOB_RETRY, + retry=google.cloud.bigquery.retry.DEFAULT_RETRY, + job_retry=google.cloud.bigquery.retry.DEFAULT_JOB_RETRY, ) assert len(list(rows)) == 4