From e265db6a6a37d13056dcaac240c2cf3975dfd644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Wed, 27 Mar 2024 09:58:18 -0500 Subject: [PATCH] fix: use an allowlist instead of denylist to determine when `query_and_wait` uses `jobs.query` API (#1869) --- google/cloud/bigquery/_job_helpers.py | 53 ++++++++++++++++++--------- tests/unit/test__job_helpers.py | 15 +++++++- 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index 0692c9b65..602a49eba 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -400,9 +400,13 @@ def query_and_wait( :class:`~google.cloud.bigquery.job.QueryJobConfig` class. """ + request_body = _to_query_request( + query=query, job_config=job_config, location=location, timeout=api_timeout + ) + # Some API parameters aren't supported by the jobs.query API. In these # cases, fallback to a jobs.insert call. - if not _supported_by_jobs_query(job_config): + if not _supported_by_jobs_query(request_body): return _wait_or_cancel( query_jobs_insert( client=client, @@ -424,9 +428,6 @@ def query_and_wait( ) path = _to_query_path(project) - request_body = _to_query_request( - query=query, job_config=job_config, location=location, timeout=api_timeout - ) if page_size is not None and max_results is not None: request_body["maxResults"] = min(page_size, max_results) @@ -506,20 +507,38 @@ def do_query(): return do_query() -def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool: +def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool: """True if jobs.query can be used. False if jobs.insert is needed.""" - if job_config is None: - return True - - return ( - # These features aren't supported by jobs.query. - job_config.clustering_fields is None - and job_config.destination is None - and job_config.destination_encryption_configuration is None - and job_config.range_partitioning is None - and job_config.table_definitions is None - and job_config.time_partitioning is None - ) + request_keys = frozenset(request_body.keys()) + + # Per issue: https://github.com/googleapis/python-bigquery/issues/1867 + # use an allowlist here instead of a denylist because the backend API allows + # unsupported parameters without any warning or failure. Instead, keep this + # set in sync with those in QueryRequest: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest + keys_allowlist = { + "kind", + "query", + "maxResults", + "defaultDataset", + "timeoutMs", + "dryRun", + "preserveNulls", + "useQueryCache", + "useLegacySql", + "parameterMode", + "queryParameters", + "location", + "formatOptions", + "connectionProperties", + "labels", + "maximumBytesBilled", + "requestId", + "createSession", + } + + unsupported_keys = request_keys - keys_allowlist + return len(unsupported_keys) == 0 def _wait_or_cancel( diff --git a/tests/unit/test__job_helpers.py b/tests/unit/test__job_helpers.py index c30964c57..671b829f7 100644 --- a/tests/unit/test__job_helpers.py +++ b/tests/unit/test__job_helpers.py @@ -22,6 +22,7 @@ import pytest from google.cloud.bigquery.client import Client +from google.cloud.bigquery import enums from google.cloud.bigquery import _job_helpers from google.cloud.bigquery.job import copy_ as job_copy from google.cloud.bigquery.job import extract as job_extract @@ -1141,12 +1142,22 @@ def test_make_job_id_w_job_id_overrides_prefix(): False, id="destination_encryption_configuration", ), + # priority="BATCH" is not supported. See: + # https://github.com/googleapis/python-bigquery/issues/1867 + pytest.param( + job_query.QueryJobConfig( + priority=enums.QueryPriority.BATCH, + ), + False, + id="priority=BATCH", + ), ), ) -def test_supported_by_jobs_query( +def test_supported_by_jobs_query_from_queryjobconfig( job_config: Optional[job_query.QueryJobConfig], expected: bool ): - assert _job_helpers._supported_by_jobs_query(job_config) == expected + request_body = _job_helpers._to_query_request(job_config, query="SELECT 1") + assert _job_helpers._supported_by_jobs_query(request_body) == expected def test_wait_or_cancel_no_exception():