Skip to content

Commit

Permalink
Fix BigQueryCreateExternalTableOperator when using a foramt different…
Browse files Browse the repository at this point in the history
… to CSV (#33540)

* Fix BigQueryCreateExternalTableOperator when using a foramt different to CSV

* fix python object name
  • Loading branch information
hussein-awala committed Aug 20, 2023
1 parent da8004d commit 46fa5a2
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 21 deletions.
31 changes: 17 additions & 14 deletions airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -1797,6 +1797,22 @@ def execute(self, context: Context) -> None:
default_project_id=bq_hook.project_id or "",
)

external_data_configuration = {
"source_uris": source_uris,
"source_format": self.source_format,
"autodetect": self.autodetect,
"compression": self.compression,
"maxBadRecords": self.max_bad_records,
}
if self.source_format == "CSV":
external_data_configuration["csvOptions"] = {
"fieldDelimiter": self.field_delimiter,
"skipLeadingRows": self.skip_leading_rows,
"quote": self.quote_character,
"allowQuotedNewlines": self.allow_quoted_newlines,
"allowJaggedRows": self.allow_jagged_rows,
}

table_resource = {
"tableReference": {
"projectId": project_id,
Expand All @@ -1805,20 +1821,7 @@ def execute(self, context: Context) -> None:
},
"labels": self.labels,
"schema": {"fields": schema_fields},
"externalDataConfiguration": {
"source_uris": source_uris,
"source_format": self.source_format,
"maxBadRecords": self.max_bad_records,
"autodetect": self.autodetect,
"compression": self.compression,
"csvOptions": {
"fieldDelimiter": self.field_delimiter,
"skipLeadingRows": self.skip_leading_rows,
"quote": self.quote_character,
"allowQuotedNewlines": self.allow_quoted_newlines,
"allowJaggedRows": self.allow_jagged_rows,
},
},
"externalDataConfiguration": external_data_configuration,
"location": self.location,
"encryptionConfiguration": self.encryption_configuration,
}
Expand Down
59 changes: 52 additions & 7 deletions tests/providers/google/cloud/operators/test_bigquery.py
Expand Up @@ -77,8 +77,10 @@
TEST_DELETE_CONTENTS = True
TEST_TABLE_ID = "test-table-id"
TEST_GCS_BUCKET = "test-bucket"
TEST_GCS_DATA = ["dir1/*.csv"]
TEST_SOURCE_FORMAT = "CSV"
TEST_GCS_CSV_DATA = ["dir1/*.csv"]
TEST_SOURCE_CSV_FORMAT = "CSV"
TEST_GCS_PARQUET_DATA = ["dir1/*.parquet"]
TEST_SOURCE_PARQUET_FORMAT = "PARQUET"
DEFAULT_DATE = datetime(2015, 1, 1)
TEST_DAG_ID = "test-bigquery-operators"
TEST_TABLE_RESOURCES = {"tableReference": {"tableId": TEST_TABLE_ID}, "expirationTime": 1234567}
Expand Down Expand Up @@ -246,15 +248,15 @@ def test_create_existing_table(self, mock_hook, caplog, if_exists, is_conflict,

class TestBigQueryCreateExternalTableOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute(self, mock_hook):
def test_execute_with_csv_format(self, mock_hook):
operator = BigQueryCreateExternalTableOperator(
task_id=TASK_ID,
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
schema_fields=[],
bucket=TEST_GCS_BUCKET,
gcs_schema_bucket=TEST_GCS_BUCKET,
source_objects=TEST_GCS_DATA,
source_format=TEST_SOURCE_FORMAT,
source_objects=TEST_GCS_CSV_DATA,
source_format=TEST_SOURCE_CSV_FORMAT,
autodetect=True,
)

Expand All @@ -276,9 +278,9 @@ def test_execute(self, mock_hook):
"schema": {"fields": []},
"externalDataConfiguration": {
"source_uris": [
f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_DATA
f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_CSV_DATA
],
"source_format": TEST_SOURCE_FORMAT,
"source_format": TEST_SOURCE_CSV_FORMAT,
"maxBadRecords": 0,
"autodetect": True,
"compression": "NONE",
Expand All @@ -295,6 +297,49 @@ def test_execute(self, mock_hook):
}
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_with_parquet_format(self, mock_hook):
operator = BigQueryCreateExternalTableOperator(
task_id=TASK_ID,
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
schema_fields=[],
bucket=TEST_GCS_BUCKET,
gcs_schema_bucket=TEST_GCS_BUCKET,
source_objects=TEST_GCS_PARQUET_DATA,
source_format=TEST_SOURCE_PARQUET_FORMAT,
autodetect=True,
)

mock_hook.return_value.split_tablename.return_value = (
TEST_GCP_PROJECT_ID,
TEST_DATASET,
TEST_TABLE_ID,
)

operator.execute(context=MagicMock())
mock_hook.return_value.create_empty_table.assert_called_once_with(
table_resource={
"tableReference": {
"projectId": TEST_GCP_PROJECT_ID,
"datasetId": TEST_DATASET,
"tableId": TEST_TABLE_ID,
},
"labels": None,
"schema": {"fields": []},
"externalDataConfiguration": {
"source_uris": [
f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_PARQUET_DATA
],
"source_format": TEST_SOURCE_PARQUET_FORMAT,
"maxBadRecords": 0,
"autodetect": True,
"compression": "NONE",
},
"location": None,
"encryptionConfiguration": None,
}
)


class TestBigQueryDeleteDatasetOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand Down

0 comments on commit 46fa5a2

Please sign in to comment.