Skip to content

Commit

Permalink
Enable asynchronous job submission in BigQuery hook (#21385)
Browse files Browse the repository at this point in the history
* Add nowait flag to the insert_job method
* When nowait is True, the execution won't wait till the job results are available.
* By default, the job execution will wait till job results are available.
  • Loading branch information
phanikumv committed Feb 11, 2022
1 parent b6892c4 commit 833087f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
10 changes: 8 additions & 2 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,7 @@ def insert_job(
job_id: Optional[str] = None,
project_id: Optional[str] = None,
location: Optional[str] = None,
nowait: bool = False,
) -> BigQueryJob:
"""
Executes a BigQuery job. Waits for the job to complete and returns job id.
Expand All @@ -1514,6 +1515,7 @@ def insert_job(
characters. If not provided then uuid will be generated.
:param project_id: Google Cloud Project where the job is running
:param location: location the job is running
:param nowait: specify whether to insert job without waiting for the result
"""
location = location or self.location
job_id = job_id or self._custom_job_id(configuration)
Expand Down Expand Up @@ -1541,8 +1543,12 @@ def insert_job(
raise AirflowException(f"Unknown job type. Supported types: {supported_jobs.keys()}")
job = job.from_api_repr(job_data, client)
self.log.info("Inserting job %s", job.job_id)
# Start the job and wait for it to complete and get the result.
job.result()
if nowait:
# Initiate the job and don't wait for it to complete.
job._begin()
else:
# Start the job and wait for it to complete and get the result.
job.result()
return job

def run_with_configuration(self, configuration: dict) -> str:
Expand Down
21 changes: 12 additions & 9 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
TABLE_REFERENCE = TableReference.from_api_repr(TABLE_REFERENCE_REPR)


class _BigQueryBaseTestClass(unittest.TestCase):
def setUp(self) -> None:
class _BigQueryBaseTestClass:
def setup_method(self) -> None:
class MockedBigQueryHook(BigQueryHook):
def _get_credentials_and_project_id(self):
return CREDENTIALS, PROJECT_ID
Expand Down Expand Up @@ -898,9 +898,10 @@ def test_run_query_with_arg(self, mock_insert):
_, kwargs = mock_insert.call_args
assert kwargs["configuration"]['labels'] == {'label1': 'test1', 'label2': 'test2'}

@pytest.mark.parametrize('nowait', [True, False])
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_insert_job(self, mock_client, mock_query_job):
def test_insert_job(self, mock_client, mock_query_job, nowait):
job_conf = {
"query": {
"query": "SELECT * FROM test",
Expand All @@ -910,10 +911,7 @@ def test_insert_job(self, mock_client, mock_query_job):
mock_query_job._JOB_TYPE = "query"

self.hook.insert_job(
configuration=job_conf,
job_id=JOB_ID,
project_id=PROJECT_ID,
location=LOCATION,
configuration=job_conf, job_id=JOB_ID, project_id=PROJECT_ID, location=LOCATION, nowait=nowait
)

mock_client.assert_called_once_with(
Expand All @@ -928,7 +926,12 @@ def test_insert_job(self, mock_client, mock_query_job):
},
mock_client.return_value,
)
mock_query_job.from_api_repr.return_value.result.assert_called_once_with()
if nowait:
mock_query_job.from_api_repr.return_value._begin.assert_called_once()
mock_query_job.from_api_repr.return_value.result.assert_not_called()
else:
mock_query_job.from_api_repr.return_value._begin.assert_not_called()
mock_query_job.from_api_repr.return_value.result.assert_called_once()

def test_dbapi_get_uri(self):
assert self.hook.get_uri().startswith('bigquery://')
Expand Down Expand Up @@ -2014,7 +2017,7 @@ def test_create_external_table_labels(self, mock_create):
)

_, kwargs = mock_create.call_args
self.assertDictEqual(kwargs['table_resource']['labels'], labels)
assert kwargs['table_resource']['labels'] == labels

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_table")
def test_create_external_table_description(self, mock_create):
Expand Down

0 comments on commit 833087f

Please sign in to comment.