Skip to content

Commit

Permalink
Modify BigQueryCreateExternalTableOperator to use updated hook functi…
Browse files Browse the repository at this point in the history
…on (#24363)

* Fixed BigQueryCreateExternalTableOperator and its unit test (#24160)
  • Loading branch information
cswpy committed Jul 12, 2022
1 parent 626d9db commit c618da4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 33 deletions.
50 changes: 34 additions & 16 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,23 +1143,41 @@ def execute(self, context: 'Context') -> None:

source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]

table = bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
autodetect=self.autodetect,
compression=self.compression,
skip_leading_rows=self.skip_leading_rows,
field_delimiter=self.field_delimiter,
max_bad_records=self.max_bad_records,
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs,
labels=self.labels,
encryption_configuration=self.encryption_configuration,
project_id, dataset_id, table_id = bq_hook.split_tablename(
table_input=self.destination_project_dataset_table,
default_project_id=bq_hook.project_id or '',
)

table_resource = {
"tableReference": {
"projectId": project_id,
"datasetId": dataset_id,
"tableId": table_id,
},
"labels": self.labels,
"schema": {"fields": schema_fields},
"externalDataConfiguration": {
"source_uris": source_uris,
"source_format": self.source_format,
"maxBadRecords": self.max_bad_records,
"autodetect": self.autodetect,
"compression": self.compression,
"csvOptions": {
"fieldDelimeter": self.field_delimiter,
"skipLeadingRows": self.skip_leading_rows,
"quote": self.quote_character,
"allowQuotedNewlines": self.allow_quoted_newlines,
"allowJaggedRows": self.allow_jagged_rows,
},
},
"location": self.location,
"encryptionConfiguration": self.encryption_configuration,
}

table = bq_hook.create_empty_table(
table_resource=table_resource,
)

BigQueryTableLink.persist(
context=context,
task_instance=self,
Expand Down
52 changes: 35 additions & 17 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,31 +190,49 @@ class TestBigQueryCreateExternalTableOperator(unittest.TestCase):
def test_execute(self, mock_hook):
operator = BigQueryCreateExternalTableOperator(
task_id=TASK_ID,
destination_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}',
destination_project_dataset_table=f'{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}',
schema_fields=[],
bucket=TEST_GCS_BUCKET,
source_objects=TEST_GCS_DATA,
source_format=TEST_SOURCE_FORMAT,
autodetect=True,
)

mock_hook.return_value.split_tablename.return_value = (
TEST_GCP_PROJECT_ID,
TEST_DATASET,
TEST_TABLE_ID,
)

operator.execute(context=MagicMock())
mock_hook.return_value.create_external_table.assert_called_once_with(
external_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}',
schema_fields=[],
source_uris=[f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA],
source_format=TEST_SOURCE_FORMAT,
autodetect=True,
compression='NONE',
skip_leading_rows=0,
field_delimiter=',',
max_bad_records=0,
quote_character=None,
allow_quoted_newlines=False,
allow_jagged_rows=False,
src_fmt_configs={},
labels=None,
encryption_configuration=None,
mock_hook.return_value.create_empty_table.assert_called_once_with(
table_resource={
"tableReference": {
"projectId": TEST_GCP_PROJECT_ID,
"datasetId": TEST_DATASET,
"tableId": TEST_TABLE_ID,
},
"labels": None,
"schema": {"fields": []},
"externalDataConfiguration": {
"source_uris": [
f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA
],
"source_format": TEST_SOURCE_FORMAT,
"maxBadRecords": 0,
"autodetect": True,
"compression": 'NONE',
"csvOptions": {
"fieldDelimeter": ',',
"skipLeadingRows": 0,
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
},
},
"location": None,
"encryptionConfiguration": None,
}
)


Expand Down

0 comments on commit c618da4

Please sign in to comment.