Skip to content

Commit

Permalink
Fix insert_job method of BigQueryHook (#9899)
Browse files Browse the repository at this point in the history
The method should submit the job and wait for the result.
Closes: #9897
  • Loading branch information
turbaszek committed Jul 21, 2020
1 parent c8c52e6 commit 1cfdebf
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
10 changes: 2 additions & 8 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,8 @@ def insert_job(
if not job:
raise AirflowException(f"Unknown job type. Supported types: {supported_jobs.keys()}")
job = job.from_api_repr(job_data, client)
# Start the job and wait for it to complete and get the result.
job.result()
return job

def run_with_configuration(self, configuration: Dict) -> str:
Expand All @@ -1501,8 +1503,6 @@ def run_with_configuration(self, configuration: Dict) -> str:
DeprecationWarning
)
job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down Expand Up @@ -1746,8 +1746,6 @@ def run_load(self, # pylint: disable=too-many-locals,too-many-arguments,invalid
configuration['load']['allowJaggedRows'] = allow_jagged_rows

job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down Expand Up @@ -1843,8 +1841,6 @@ def run_copy(self, # pylint: disable=invalid-name
] = encryption_configuration

job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down Expand Up @@ -2167,8 +2163,6 @@ def run_query(self,
] = encryption_configuration

job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down
36 changes: 36 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,42 @@ def test_run_query_with_arg(self, mock_insert):
{'label1': 'test1', 'label2': 'test2'}
)

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_insert_job(self, mock_client, mock_query_job):
job_conf = {
"query": {
"query": "SELECT * FROM test",
"useLegacySql": "False",
}
}
mock_query_job._JOB_TYPE = "query"

self.hook.insert_job(
configuration=job_conf,
job_id=JOB_ID,
project_id=PROJECT_ID,
location=LOCATION,
)

mock_client.assert_called_once_with(
project_id=PROJECT_ID,
location=LOCATION,
)

mock_query_job.from_api_repr.assert_called_once_with(
{
'configuration': job_conf,
'jobReference': {
'jobId': JOB_ID,
'projectId': PROJECT_ID,
'location': LOCATION
}
},
mock_client.return_value
)
mock_query_job.from_api_repr.return_value.result.assert_called_once_with()


class TestBigQueryTableSplitter(unittest.TestCase):
def test_internal_need_default_project(self):
Expand Down

0 comments on commit 1cfdebf

Please sign in to comment.