Skip to content

Commit

Permalink
Add project_id to client inside BigQuery hook update_table method (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
owlphi committed Dec 14, 2020
1 parent 4d3300c commit 1c1ef7e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ def update_table(

table = Table.from_api_repr(table_resource)
self.log.info('Updating table: %s', table_resource["tableReference"])
table_object = self.get_client().update_table(table=table, fields=fields)
table_object = self.get_client(project_id=project_id).update_table(table=table, fields=fields)
self.log.info('Table %s.%s.%s updated successfully', project_id, dataset_id, table_id)
return table_object.to_api_repr()

Expand Down
52 changes: 52 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,58 @@ def test_create_external_table_with_kms(self, mock_create):
exists_ok=True,
)

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Table")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
def test_update_table(self, mock_client, mock_table):
description_patched = 'Test description.'
expiration_time_patched = 2524608000000
friendly_name_patched = 'Test friendly name.'
labels_patched = {'label1': 'test1', 'label2': 'test2'}
schema_patched = [
{'name': 'id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'balance', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'new_field', 'type': 'STRING', 'mode': 'NULLABLE'},
]
time_partitioning_patched = {'expirationMs': 10000000}
require_partition_filter_patched = True
view_patched = {
'query': "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 500",
'useLegacySql': False,
}

body = {
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_ID,
"tableId": TABLE_ID,
},
"description": description_patched,
"expirationTime": expiration_time_patched,
"friendlyName": friendly_name_patched,
"labels": labels_patched,
"schema": {"fields": schema_patched},
"timePartitioning": time_partitioning_patched,
"view": view_patched,
"requirePartitionFilter": require_partition_filter_patched,
}

fields = list(body.keys())

self.hook.update_table(
table_resource=body,
fields=fields,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
project_id=PROJECT_ID,
)

mock_table.from_api_repr.assert_called_once_with(body)

mock_client.return_value.update_table.assert_called_once_with(
table=mock_table.from_api_repr.return_value, fields=fields
)

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.insert_job")
def test_run_query_with_kms(self, mock_insert):
encryption_configuration = {"kms_key_name": "projects/p/locations/l/keyRings/k/cryptoKeys/c"}
Expand Down

0 comments on commit 1c1ef7e

Please sign in to comment.