Skip to content

Commit

Permalink
resolve template fields init checks for bigquery (#37586)
Browse files Browse the repository at this point in the history
  • Loading branch information
okirialbert committed Feb 21, 2024
1 parent 2699f8c commit 2cb96a8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ repos:
exclude: |
(?x)^(
^.*__init__\.py$|
^airflow\/providers\/google\/cloud\/operators\/bigquery\.py$|
^airflow\/providers\/amazon\/aws\/transfers\/gcs_to_s3\.py$|
^airflow\/providers\/databricks\/operators\/databricks\.py$|
^airflow\/providers\/google\/cloud\/transfers\/bigquery_to_mysql\.py$|
Expand Down
31 changes: 15 additions & 16 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ def __init__(
) -> None:
super().__init__(sql=sql, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.sql = sql
self.use_legacy_sql = use_legacy_sql
self.location = location
self.impersonation_chain = impersonation_chain
Expand Down Expand Up @@ -798,9 +797,6 @@ def __init__(
**kwargs,
) -> None:
super().__init__(table=table, checks=checks, partition_clause=partition_clause, **kwargs)
self.table = table
self.checks = checks
self.partition_clause = partition_clause
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.location = location
Expand Down Expand Up @@ -964,7 +960,7 @@ def __init__(
self.dataset_id = dataset_id
self.table_id = table_id
self.job_project_id = job_project_id
self.max_results = int(max_results)
self.max_results = max_results
self.selected_fields = selected_fields
self.gcp_conn_id = gcp_conn_id
self.location = location
Expand Down Expand Up @@ -1000,7 +996,7 @@ def generate_query(self, hook: BigQueryHook) -> str:
query += "*"
query += (
f" from `{self.table_project_id or hook.project_id}.{self.dataset_id}"
f".{self.table_id}` limit {self.max_results}"
f".{self.table_id}` limit {int(self.max_results)}"
)
return query

Expand All @@ -1027,7 +1023,7 @@ def execute(self, context: Context):
self.table_project_id or hook.project_id,
self.dataset_id,
self.table_id,
self.max_results,
int(self.max_results),
)
if not self.selected_fields:
schema: dict[str, list] = hook.get_schema(
Expand All @@ -1041,7 +1037,7 @@ def execute(self, context: Context):
rows = hook.list_rows(
dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
max_results=int(self.max_results),
selected_fields=self.selected_fields,
location=self.location,
project_id=self.table_project_id or hook.project_id,
Expand Down Expand Up @@ -1217,6 +1213,7 @@ def __init__(
location: str | None = None,
encryption_configuration: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
job_id: str | list[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -1242,7 +1239,7 @@ def __init__(
self.encryption_configuration = encryption_configuration
self.hook: BigQueryHook | None = None
self.impersonation_chain = impersonation_chain
self.job_id: str | list[str] | None = None
self.job_id = job_id

def execute(self, context: Context):
if self.hook is None:
Expand Down Expand Up @@ -1478,7 +1475,7 @@ def __init__(
self.gcs_schema_object = gcs_schema_object
self.gcp_conn_id = gcp_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.time_partitioning = {} if time_partitioning is None else time_partitioning
self.time_partitioning = time_partitioning or {}
self.labels = labels
self.view = view
self.materialized_view = materialized_view
Expand Down Expand Up @@ -1693,6 +1690,13 @@ def __init__(

super().__init__(**kwargs)

self.table_resource = table_resource
self.bucket = bucket or ""
self.source_objects = source_objects or []
self.schema_object = schema_object or None
self.gcs_schema_bucket = gcs_schema_bucket or ""
self.destination_project_dataset_table = destination_project_dataset_table or ""

# BQ config
kwargs_passed = any(
[
Expand Down Expand Up @@ -1750,12 +1754,7 @@ def __init__(
self.field_delimiter = field_delimiter
self.table_resource = None
else:
self.table_resource = table_resource
self.bucket = ""
self.source_objects = []
self.schema_object = None
self.gcs_schema_bucket = ""
self.destination_project_dataset_table = ""
pass

if table_resource and kwargs_passed:
raise ValueError("You provided both `table_resource` and exclusive keywords arguments.")
Expand Down

0 comments on commit 2cb96a8

Please sign in to comment.