Skip to content

Commit

Permalink
Added location parameter to BigQueryCheckOperator (#8273)
Browse files Browse the repository at this point in the history
  • Loading branch information
edejong committed Apr 18, 2020
1 parent dd9f04e commit 79c99b1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2445,6 +2445,7 @@ def next(self) -> Union[List, None]:
query_results = (self.service.jobs().getQueryResults(
projectId=self.project_id,
jobId=self.job_id,
location=self.location,
pageToken=self.page_token).execute(num_retries=self.num_retries))

if 'rows' in query_results and query_results['rows']:
Expand Down
9 changes: 8 additions & 1 deletion airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class BigQueryCheckOperator(CheckOperator):
:param use_legacy_sql: Whether to use legacy SQL (true)
or standard SQL (false).
:type use_legacy_sql: bool
:param location: The geographic location of the job. Required except for
US and EU. See details at
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
:type location: str
"""

template_fields = ('sql', 'gcp_conn_id',)
Expand All @@ -88,6 +92,7 @@ def __init__(self,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
use_legacy_sql: bool = True,
location=None,
*args, **kwargs) -> None:
super().__init__(sql=sql, *args, **kwargs)
if bigquery_conn_id:
Expand All @@ -99,10 +104,12 @@ def __init__(self,
self.gcp_conn_id = gcp_conn_id
self.sql = sql
self.use_legacy_sql = use_legacy_sql
self.location = location

def get_db_hook(self):
return BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
use_legacy_sql=self.use_legacy_sql)
use_legacy_sql=self.use_legacy_sql,
location=self.location)


class BigQueryValueCheckOperator(ValueCheckOperator):
Expand Down
8 changes: 6 additions & 2 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
TABLE_ID = "bq_table"
VIEW_ID = 'bq_view'
JOB_ID = 1234
LOCATION = 'europe-north1'


class TestBigQueryHookMethods(unittest.TestCase):
Expand Down Expand Up @@ -1612,14 +1613,16 @@ def test_next(self, mock_get_service, mock_get_creds_and_proj_id):
bq_hook = hook.BigQueryHook()
bq_cursor = bq_hook.get_cursor()
bq_cursor.job_id = JOB_ID
bq_cursor.location = LOCATION

result = bq_cursor.next()
self.assertEqual(['one', 1], result)

result = bq_cursor.next()
self.assertEqual(['two', 2], result)

mock_get_query_results.assert_called_once_with(jobId=JOB_ID, pageToken=None, projectId='bq-project')
mock_get_query_results.assert_called_once_with(jobId=JOB_ID, location=LOCATION, pageToken=None,
projectId='bq-project')
mock_execute.assert_called_once_with(num_retries=bq_cursor.num_retries)

@mock.patch(
Expand All @@ -1640,7 +1643,8 @@ def test_next_no_rows(self, mock_flush_results, mock_get_service, mock_get_creds
result = bq_cursor.next()

self.assertIsNone(result)
mock_get_query_results.assert_called_once_with(jobId=JOB_ID, pageToken=None, projectId='bq-project')
mock_get_query_results.assert_called_once_with(jobId=JOB_ID, location=None, pageToken=None,
projectId='bq-project')
mock_execute.assert_called_once_with(num_retries=bq_cursor.num_retries)
assert mock_flush_results.call_count == 1

Expand Down

0 comments on commit 79c99b1

Please sign in to comment.