Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use an allowlist instead of denylist to determine when query_and_wait uses jobs.query API #1869

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
fix: use an allowlist instead of denylist to determine when `query_an…
…d_wait` uses `jobs.query` API
  • Loading branch information
tswast committed Mar 25, 2024
commit 6e19fd511346a848fc1b603a1844f35cdb0f9ef1
53 changes: 36 additions & 17 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,13 @@ def query_and_wait(
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)

# Some API parameters aren't supported by the jobs.query API. In these
# cases, fallback to a jobs.insert call.
if not _supported_by_jobs_query(job_config):
if not _supported_by_jobs_query(request_body):
return _wait_or_cancel(
query_jobs_insert(
client=client,
Expand All @@ -424,9 +428,6 @@ def query_and_wait(
)

path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)

if page_size is not None and max_results is not None:
request_body["maxResults"] = min(page_size, max_results)
Expand Down Expand Up @@ -506,20 +507,38 @@ def do_query():
return do_query()


def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool:
def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
"""True if jobs.query can be used. False if jobs.insert is needed."""
if job_config is None:
return True

return (
# These features aren't supported by jobs.query.
job_config.clustering_fields is None
and job_config.destination is None
and job_config.destination_encryption_configuration is None
and job_config.range_partitioning is None
and job_config.table_definitions is None
and job_config.time_partitioning is None
)
request_keys = frozenset(request_body.keys())

# Per issue: https://github.com/googleapis/python-bigquery/issues/1867
# use an allowlist here instead of a denylist because the backend API allows
# unsupported parameters without any warning or failure. Instead, keep this
# set in sync with those in QueryRequest:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest
keys_allowlist = {
"kind",
"query",
"maxResults",
"defaultDataset",
"timeoutMs",
"dryRun",
"preserveNulls",
"useQueryCache",
"useLegacySql",
"parameterMode",
"queryParameters",
"location",
"formatOptions",
"connectionProperties",
"labels",
"maximumBytesBilled",
"requestId",
"createSession",
}

unsupported_keys = request_keys - keys_allowlist
return len(unsupported_keys) == 0


def _wait_or_cancel(
Expand Down
15 changes: 13 additions & 2 deletions tests/unit/test__job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pytest

from google.cloud.bigquery.client import Client
from google.cloud.bigquery import enums
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery.job import copy_ as job_copy
from google.cloud.bigquery.job import extract as job_extract
Expand Down Expand Up @@ -1141,12 +1142,22 @@ def test_make_job_id_w_job_id_overrides_prefix():
False,
id="destination_encryption_configuration",
),
# priority="BATCH" is not supported. See:
# https://github.com/googleapis/python-bigquery/issues/1867
pytest.param(
job_query.QueryJobConfig(
priority=enums.QueryPriority.BATCH,
),
False,
id="priority=BATCH",
),
),
)
def test_supported_by_jobs_query(
def test_supported_by_jobs_query_from_queryjobconfig(
job_config: Optional[job_query.QueryJobConfig], expected: bool
):
assert _job_helpers._supported_by_jobs_query(job_config) == expected
request_body = _job_helpers._to_query_request(job_config, query="SELECT 1")
assert _job_helpers._supported_by_jobs_query(request_body) == expected


def test_wait_or_cancel_no_exception():
Expand Down