Skip to content

Commit

Permalink
Google Cloud Providers - Introduce GoogleCloudBaseOperator (#29680)
Browse files Browse the repository at this point in the history
The need for this base for all Cloud operators arose from the need
to override a deepcopy and fix crashes on attempts to create a copy
of a DEFAULT constant stored as the parameter across multiple
operators that use Generated API Clients.

Co-authored-by: Kwon Soonmok <[email protected]>
  • Loading branch information
IKholopov and Soonmok committed Feb 25, 2023
1 parent 717426e commit 1e7c064
Show file tree
Hide file tree
Showing 49 changed files with 428 additions and 387 deletions.
28 changes: 14 additions & 14 deletions airflow/providers/google/cloud/operators/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
TableSpec,
)

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
from airflow.providers.google.cloud.links.automl import (
AutoMLDatasetLink,
Expand All @@ -41,14 +40,15 @@
AutoMLModelPredictLink,
AutoMLModelTrainLink,
)
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context

MetaData = Sequence[Tuple[str, str]]


class AutoMLTrainModelOperator(BaseOperator):
class AutoMLTrainModelOperator(GoogleCloudBaseOperator):
"""
Creates Google Cloud AutoML model.
Expand Down Expand Up @@ -144,7 +144,7 @@ def execute(self, context: Context):
return result


class AutoMLPredictOperator(BaseOperator):
class AutoMLPredictOperator(GoogleCloudBaseOperator):
"""
Runs prediction operation on Google Cloud AutoML.
Expand Down Expand Up @@ -236,7 +236,7 @@ def execute(self, context: Context):
return PredictResponse.to_dict(result)


class AutoMLBatchPredictOperator(BaseOperator):
class AutoMLBatchPredictOperator(GoogleCloudBaseOperator):
"""
Perform a batch prediction on Google Cloud AutoML.
Expand Down Expand Up @@ -345,7 +345,7 @@ def execute(self, context: Context):
return result


class AutoMLCreateDatasetOperator(BaseOperator):
class AutoMLCreateDatasetOperator(GoogleCloudBaseOperator):
"""
Creates a Google Cloud AutoML dataset.
Expand Down Expand Up @@ -437,7 +437,7 @@ def execute(self, context: Context):
return result


class AutoMLImportDataOperator(BaseOperator):
class AutoMLImportDataOperator(GoogleCloudBaseOperator):
"""
Imports data to a Google Cloud AutoML dataset.
Expand Down Expand Up @@ -530,7 +530,7 @@ def execute(self, context: Context):
)


class AutoMLTablesListColumnSpecsOperator(BaseOperator):
class AutoMLTablesListColumnSpecsOperator(GoogleCloudBaseOperator):
"""
Lists column specs in a table.
Expand Down Expand Up @@ -640,7 +640,7 @@ def execute(self, context: Context):
return result


class AutoMLTablesUpdateDatasetOperator(BaseOperator):
class AutoMLTablesUpdateDatasetOperator(GoogleCloudBaseOperator):
"""
Updates a dataset.
Expand Down Expand Up @@ -727,7 +727,7 @@ def execute(self, context: Context):
return Dataset.to_dict(result)


class AutoMLGetModelOperator(BaseOperator):
class AutoMLGetModelOperator(GoogleCloudBaseOperator):
"""
Get Google Cloud AutoML model.
Expand Down Expand Up @@ -814,7 +814,7 @@ def execute(self, context: Context):
return model


class AutoMLDeleteModelOperator(BaseOperator):
class AutoMLDeleteModelOperator(GoogleCloudBaseOperator):
"""
Delete Google Cloud AutoML model.
Expand Down Expand Up @@ -890,7 +890,7 @@ def execute(self, context: Context):
operation.result()


class AutoMLDeployModelOperator(BaseOperator):
class AutoMLDeployModelOperator(GoogleCloudBaseOperator):
"""
Deploys a model. If a model is already deployed, deploying it with the same parameters
has no effect. Deploying with different parameters (as e.g. changing node_number) will
Expand Down Expand Up @@ -980,7 +980,7 @@ def execute(self, context: Context):
self.log.info("Model deployed.")


class AutoMLTablesListTableSpecsOperator(BaseOperator):
class AutoMLTablesListTableSpecsOperator(GoogleCloudBaseOperator):
"""
Lists table specs in a dataset.
Expand Down Expand Up @@ -1080,7 +1080,7 @@ def execute(self, context: Context):
return result


class AutoMLListDatasetOperator(BaseOperator):
class AutoMLListDatasetOperator(GoogleCloudBaseOperator):
"""
Lists AutoML Datasets in project.
Expand Down Expand Up @@ -1162,7 +1162,7 @@ def execute(self, context: Context):
return result


class AutoMLDeleteDatasetOperator(BaseOperator):
class AutoMLDeleteDatasetOperator(GoogleCloudBaseOperator):
"""
Deletes a dataset and all of its contents.
Expand Down
31 changes: 16 additions & 15 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url
from airflow.providers.google.cloud.links.bigquery import BigQueryDatasetLink, BigQueryTableLink
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.bigquery import (
BigQueryCheckTrigger,
BigQueryGetDataTrigger,
Expand Down Expand Up @@ -730,7 +731,7 @@ def execute(self, context=None):
self.log.info("All tests have passed")


class BigQueryGetDataOperator(BaseOperator):
class BigQueryGetDataOperator(GoogleCloudBaseOperator):
"""
Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
and returns data in a python list. The number of elements in the returned list will
Expand Down Expand Up @@ -920,7 +921,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
return event["records"]


class BigQueryExecuteQueryOperator(BaseOperator):
class BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
"""
Executes BigQuery SQL queries in a specific BigQuery database.
This operator does not assert idempotency.
Expand Down Expand Up @@ -1140,7 +1141,7 @@ def on_kill(self) -> None:
self.hook.cancel_job(self.hook.running_job_id)


class BigQueryCreateEmptyTableOperator(BaseOperator):
class BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
"""
Creates a new, empty table in the specified BigQuery dataset,
optionally with schema.
Expand Down Expand Up @@ -1377,7 +1378,7 @@ def execute(self, context: Context) -> None:
self.log.info("Table %s.%s already exists.", self.dataset_id, self.table_id)


class BigQueryCreateExternalTableOperator(BaseOperator):
class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
"""
Creates a new external table in the dataset with the data from Google Cloud
Storage.
Expand Down Expand Up @@ -1667,7 +1668,7 @@ def execute(self, context: Context) -> None:
)


class BigQueryDeleteDatasetOperator(BaseOperator):
class BigQueryDeleteDatasetOperator(GoogleCloudBaseOperator):
"""
This operator deletes an existing dataset from your Project in Big query.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
Expand Down Expand Up @@ -1751,7 +1752,7 @@ def execute(self, context: Context) -> None:
)


class BigQueryCreateEmptyDatasetOperator(BaseOperator):
class BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
"""
This operator is used to create new dataset for your Project in BigQuery.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
Expand Down Expand Up @@ -1856,7 +1857,7 @@ def execute(self, context: Context) -> None:
self.log.info("Dataset %s already exists.", dataset_id)


class BigQueryGetDatasetOperator(BaseOperator):
class BigQueryGetDatasetOperator(GoogleCloudBaseOperator):
"""
This operator is used to return the dataset specified by dataset_id.
Expand Down Expand Up @@ -1931,7 +1932,7 @@ def execute(self, context: Context):
return dataset


class BigQueryGetDatasetTablesOperator(BaseOperator):
class BigQueryGetDatasetTablesOperator(GoogleCloudBaseOperator):
"""
This operator retrieves the list of tables in the specified dataset.
Expand Down Expand Up @@ -2001,7 +2002,7 @@ def execute(self, context: Context):
)


class BigQueryPatchDatasetOperator(BaseOperator):
class BigQueryPatchDatasetOperator(GoogleCloudBaseOperator):
"""
This operator is used to patch dataset for your Project in BigQuery.
It only replaces fields that are provided in the submitted dataset resource.
Expand Down Expand Up @@ -2079,7 +2080,7 @@ def execute(self, context: Context):
)


class BigQueryUpdateTableOperator(BaseOperator):
class BigQueryUpdateTableOperator(GoogleCloudBaseOperator):
"""
This operator is used to update table for your Project in BigQuery.
Use ``fields`` to specify which fields of table to update. If a field
Expand Down Expand Up @@ -2176,7 +2177,7 @@ def execute(self, context: Context):
return table


class BigQueryUpdateDatasetOperator(BaseOperator):
class BigQueryUpdateDatasetOperator(GoogleCloudBaseOperator):
"""
This operator is used to update dataset for your Project in BigQuery.
Use ``fields`` to specify which fields of dataset to update. If a field
Expand Down Expand Up @@ -2268,7 +2269,7 @@ def execute(self, context: Context):
return dataset


class BigQueryDeleteTableOperator(BaseOperator):
class BigQueryDeleteTableOperator(GoogleCloudBaseOperator):
"""
Deletes BigQuery tables
Expand Down Expand Up @@ -2337,7 +2338,7 @@ def execute(self, context: Context) -> None:
hook.delete_table(table_id=self.deletion_dataset_table, not_found_ok=self.ignore_if_missing)


class BigQueryUpsertTableOperator(BaseOperator):
class BigQueryUpsertTableOperator(GoogleCloudBaseOperator):
"""
Upsert BigQuery table
Expand Down Expand Up @@ -2425,7 +2426,7 @@ def execute(self, context: Context) -> None:
)


class BigQueryUpdateTableSchemaOperator(BaseOperator):
class BigQueryUpdateTableSchemaOperator(GoogleCloudBaseOperator):
"""
Update BigQuery Table Schema
Updates fields on a table schema based on contents of the supplied schema_fields_updates
Expand Down Expand Up @@ -2538,7 +2539,7 @@ def execute(self, context: Context):
return table


class BigQueryInsertJobOperator(BaseOperator):
class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
"""
Executes a BigQuery job. Waits for the job to complete and returns job id.
This operator work in the following way:
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/operators/bigquery_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@

from airflow import AirflowException
from airflow.compat.functools import cached_property
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery_dts import BiqQueryDataTransferServiceHook, get_object_id
from airflow.providers.google.cloud.links.bigquery_dts import BigQueryDataTransferConfigLink
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.bigquery_dts import BigQueryDataTransferRunTrigger

if TYPE_CHECKING:
Expand All @@ -46,7 +46,7 @@ def _get_transfer_config_details(config_transfer_name: str):
return {"project_id": config_details[1], "region": config_details[3], "config_id": config_details[5]}


class BigQueryCreateDataTransferOperator(BaseOperator):
class BigQueryCreateDataTransferOperator(GoogleCloudBaseOperator):
"""
Creates a new data transfer configuration.
Expand Down Expand Up @@ -144,7 +144,7 @@ def execute(self, context: Context):
return result


class BigQueryDeleteDataTransferConfigOperator(BaseOperator):
class BigQueryDeleteDataTransferConfigOperator(GoogleCloudBaseOperator):
"""
Deletes transfer configuration.
Expand Down Expand Up @@ -216,7 +216,7 @@ def execute(self, context: Context) -> None:
)


class BigQueryDataTransferServiceStartTransferRunsOperator(BaseOperator):
class BigQueryDataTransferServiceStartTransferRunsOperator(GoogleCloudBaseOperator):
"""
Start manual transfer runs to be executed now with schedule_time equal
to current time. The transfer runs can be created for a time range where
Expand Down
14 changes: 7 additions & 7 deletions airflow/providers/google/cloud/operators/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
from google.cloud.bigtable.column_family import GarbageCollectionRule

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
from airflow.providers.google.cloud.links.bigtable import (
BigtableClusterLink,
BigtableInstanceLink,
BigtableTablesLink,
)
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand All @@ -49,7 +49,7 @@ def _validate_inputs(self):
raise AirflowException(f"Empty parameter: {attr_name}")


class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
class BigtableCreateInstanceOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
"""
Creates a new Cloud Bigtable instance.
If the Cloud Bigtable instance with the given ID exists, the operator does not
Expand Down Expand Up @@ -171,7 +171,7 @@ def execute(self, context: Context) -> None:
raise e


class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
class BigtableUpdateInstanceOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
"""
Updates an existing Cloud Bigtable instance.
Expand Down Expand Up @@ -258,7 +258,7 @@ def execute(self, context: Context) -> None:
raise e


class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
class BigtableDeleteInstanceOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
"""
Deletes the Cloud Bigtable instance, including its clusters and all related tables.
Expand Down Expand Up @@ -324,7 +324,7 @@ def execute(self, context: Context) -> None:
raise e


class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
class BigtableCreateTableOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
"""
Creates the table in the Cloud Bigtable instance.
Expand Down Expand Up @@ -434,7 +434,7 @@ def execute(self, context: Context) -> None:
self.log.info("The table '%s' already exists. Consider it as created", self.table_id)


class BigtableDeleteTableOperator(BaseOperator, BigtableValidationMixin):
class BigtableDeleteTableOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
"""
Deletes the Cloud Bigtable table.
Expand Down Expand Up @@ -512,7 +512,7 @@ def execute(self, context: Context) -> None:
raise e


class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
class BigtableUpdateClusterOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
"""
Updates a Cloud Bigtable cluster.
Expand Down

0 comments on commit 1e7c064

Please sign in to comment.