Skip to content

Commit

Permalink
Fix error when create external table using table resource (#17998)
Browse files Browse the repository at this point in the history
  • Loading branch information
tnyz committed Sep 13, 2021
1 parent d119ae8 commit 8ae2bb9
Showing 1 changed file with 23 additions and 33 deletions.
56 changes: 23 additions & 33 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import attr
from google.api_core.exceptions import Conflict
from google.cloud.bigquery import TableReference

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink
Expand Down Expand Up @@ -1189,6 +1188,11 @@ def execute(self, context) -> None:
location=self.location,
impersonation_chain=self.impersonation_chain,
)
if self.table_resource:
bq_hook.create_empty_table(
table_resource=self.table_resource,
)
return

if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GCSHook(
Expand All @@ -1200,36 +1204,24 @@ def execute(self, context) -> None:
else:
schema_fields = self.schema_fields

if schema_fields and self.table_resource:
self.table_resource["externalDataConfiguration"]["schema"] = schema_fields

if self.table_resource:
tab_ref = TableReference.from_string(self.destination_project_dataset_table)
bq_hook.create_empty_table(
table_resource=self.table_resource,
project_id=tab_ref.project,
table_id=tab_ref.table_id,
dataset_id=tab_ref.dataset_id,
)
else:
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]

bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
compression=self.compression,
skip_leading_rows=self.skip_leading_rows,
field_delimiter=self.field_delimiter,
max_bad_records=self.max_bad_records,
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs,
labels=self.labels,
encryption_configuration=self.encryption_configuration,
)
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]

bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
compression=self.compression,
skip_leading_rows=self.skip_leading_rows,
field_delimiter=self.field_delimiter,
max_bad_records=self.max_bad_records,
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs,
labels=self.labels,
encryption_configuration=self.encryption_configuration,
)


class BigQueryDeleteDatasetOperator(BaseOperator):
Expand Down Expand Up @@ -1497,7 +1489,6 @@ def __init__(
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:

self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
Expand Down Expand Up @@ -1646,7 +1637,6 @@ def __init__(
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:

warnings.warn(
"This operator is deprecated. Please use BigQueryUpdateDatasetOperator.",
DeprecationWarning,
Expand Down

0 comments on commit 8ae2bb9

Please sign in to comment.