Skip to content

Commit

Permalink
D205 Support - Provider: Google (#32356)
Browse files Browse the repository at this point in the history
* D205 Support - Provider: Google

* PR Fixes
  • Loading branch information
ferruzzi committed Jul 8, 2023
1 parent 92497fa commit 0f73647
Show file tree
Hide file tree
Showing 98 changed files with 567 additions and 590 deletions.
22 changes: 12 additions & 10 deletions airflow/providers/google/cloud/hooks/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ def create_model(
retry: Retry | _MethodDefault = DEFAULT,
) -> Operation:
"""
Creates a model_id. Returns a Model in the `response` field when it
completes. When you create a model, several model evaluations are
created for it: a global evaluation, and one evaluation for each
annotation spec.
Creates a model_id and returns a Model in the `response` field when it completes.
When you create a model, several model evaluations are created for it:
a global evaluation, and one evaluation for each annotation spec.
:param model: The model_id to create. If a dict is provided, it must be of the same form
as the protobuf message `google.cloud.automl_v1beta1.types.Model`
Expand Down Expand Up @@ -163,9 +163,10 @@ def batch_predict(
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Perform a batch prediction. Unlike the online `Predict`, batch
prediction result won't be immediately available in the response.
Instead, a long running operation object is returned.
Perform a batch prediction and returns a long-running operation object.
Unlike the online `Predict`, batch prediction result won't be immediately
available in the response. Instead, a long-running operation object is returned.
:param model_id: Name of the model_id requested to serve the batch prediction.
:param input_config: Required. The input configuration for batch prediction.
Expand Down Expand Up @@ -215,8 +216,7 @@ def predict(
metadata: Sequence[tuple[str, str]] = (),
) -> PredictResponse:
"""
Perform an online prediction. The prediction result will be directly
returned in the response.
Perform an online prediction and returns the prediction result in the response.
:param model_id: Name of the model_id requested to serve the prediction.
:param payload: Required. Payload to perform a prediction on. The payload must match the problem type
Expand Down Expand Up @@ -485,7 +485,9 @@ def deploy_model(
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Deploys a model. If a model is already deployed, deploying it with the same parameters
Deploys a model.
If a model is already deployed, deploying it with the same parameters
has no effect. Deploying with different parameters (as e.g. changing node_number) will
reset the deployment state without pausing the model_id's availability.
Expand Down
10 changes: 6 additions & 4 deletions airflow/providers/google/cloud/hooks/bigquery_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def __init__(
@staticmethod
def _disable_auto_scheduling(config: dict | TransferConfig) -> TransferConfig:
"""
Create a transfer config with the automatic scheduling disabled.
In the case of Airflow, the customer needs to create a transfer config
with the automatic scheduling disabled (UI, CLI or an Airflow operator) and
then trigger a transfer run using a specialized Airflow operator that will
Expand Down Expand Up @@ -195,10 +197,10 @@ def start_manual_transfer_runs(
metadata: Sequence[tuple[str, str]] = (),
) -> StartManualTransferRunsResponse:
"""
Start manual transfer runs to be executed now with schedule_time equal
to current time. The transfer runs can be created for a time range where
the run_time is between start_time (inclusive) and end_time
(exclusive), or for a specific run_time.
Start manual transfer runs to be executed now with schedule_time equal to current time.
The transfer runs can be created for a time range where the run_time is between
start_time (inclusive) and end_time (exclusive), or for a specific run_time.
:param transfer_config_id: Id of transfer config to be used.
:param requested_time_range: Time range for the transfer runs that should be started.
Expand Down
8 changes: 6 additions & 2 deletions airflow/providers/google/cloud/hooks/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ def _get_client(self, project_id: str) -> Client:
@GoogleBaseHook.fallback_to_default_project_id
def get_instance(self, instance_id: str, project_id: str) -> Instance | None:
"""
Retrieves and returns the specified Cloud Bigtable instance if it exists.
Otherwise, returns None.
Retrieves and returns the specified Cloud Bigtable instance if it exists, otherwise returns None.
:param instance_id: The ID of the Cloud Bigtable instance.
:param project_id: Optional, Google Cloud project ID where the
Expand All @@ -86,6 +85,7 @@ def get_instance(self, instance_id: str, project_id: str) -> Instance | None:
def delete_instance(self, instance_id: str, project_id: str) -> None:
"""
Deletes the specified Cloud Bigtable instance.
Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does
not exist.
Expand Down Expand Up @@ -217,6 +217,7 @@ def create_table(
) -> None:
"""
Creates the specified Cloud Bigtable table.
Raises ``google.api_core.exceptions.AlreadyExists`` if the table exists.
:param instance: The Cloud Bigtable instance that owns the table.
Expand All @@ -238,6 +239,7 @@ def create_table(
def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None:
"""
Deletes the specified table in Cloud Bigtable.
Raises google.api_core.exceptions.NotFound if the table does not exist.
:param instance_id: The ID of the Cloud Bigtable instance.
Expand All @@ -256,6 +258,7 @@ def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None
def update_cluster(instance: Instance, cluster_id: str, nodes: int) -> None:
"""
Updates number of nodes in the specified Cloud Bigtable cluster.
Raises google.api_core.exceptions.NotFound if the cluster does not exist.
:param instance: The Cloud Bigtable instance that owns the cluster.
Expand Down Expand Up @@ -284,6 +287,7 @@ def get_column_families_for_table(instance: Instance, table_id: str) -> dict[str
def get_cluster_states_for_table(instance: Instance, table_id: str) -> dict[str, ClusterState]:
"""
Fetches Cluster States for the specified table in Cloud Bigtable.
Raises google.api_core.exceptions.NotFound if the table does not exist.
:param instance: The Cloud Bigtable instance that owns the table.
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/google/cloud/hooks/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,7 @@ def retry_build(
location: str = "global",
) -> Build:
"""
Creates a new build based on the specified build. This method creates a new build
using the original build request, which may or may not result in an identical build.
Create a new build using the original build request; may or may not result in an identical build.
:param id_: Build ID of the original build.
:param project_id: Optional, Google Cloud Project project_id where the function belongs.
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/hooks/cloud_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def get_environment(
) -> Environment:
"""
Get an existing environment.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param environment_id: Required. The ID of the Google Cloud environment that the service belongs to.
Expand Down
14 changes: 6 additions & 8 deletions airflow/providers/google/cloud/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ def delete_database(self, instance: str, database: str, project_id: str) -> None
@GoogleBaseHook.fallback_to_default_project_id
def export_instance(self, instance: str, body: dict, project_id: str):
"""
Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump
or CSV file.
Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump or CSV file.
:param instance: Database instance ID of the Cloud SQL instance. This does not include the
project ID.
Expand All @@ -327,8 +326,7 @@ def export_instance(self, instance: str, body: dict, project_id: str):
@GoogleBaseHook.fallback_to_default_project_id
def import_instance(self, instance: str, body: dict, project_id: str) -> None:
"""
Imports data into a Cloud SQL instance from a SQL dump or CSV file in
Cloud Storage.
Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
:param instance: Database instance ID. This does not include the
project ID.
Expand Down Expand Up @@ -382,8 +380,7 @@ def _wait_for_operation_to_complete(
self, project_id: str, operation_name: str, time_to_sleep: int = TIME_TO_SLEEP_IN_SECONDS
) -> None:
"""
Waits for the named operation to complete - checks status of the
asynchronous call.
Waits for the named operation to complete - checks status of the asynchronous call.
:param project_id: Project ID of the project that contains the instance.
:param operation_name: Name of the operation.
Expand Down Expand Up @@ -721,10 +718,11 @@ def get_socket_path(self) -> str:


class CloudSQLDatabaseHook(BaseHook):
"""Serves DB connection configuration for Google Cloud SQL (Connections
of *gcpcloudsqldb://* type).
"""
Serves DB connection configuration for Google Cloud SQL (Connections of *gcpcloudsqldb://* type).
The hook is a "meta" one. It does not perform an actual connection.
It is there to retrieve all the parameters configured in gcpcloudsql:// connection,
start/stop Cloud SQL Proxy if needed, dynamically generate Postgres or MySQL
connection in the database and return an actual Postgres or MySQL hook.
Expand Down
27 changes: 19 additions & 8 deletions airflow/providers/google/cloud/hooks/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def insert_instance_template(
) -> None:
"""
Creates Instance Template using body specified.
Must be called with keyword arguments rather than positional.
:param body: Instance Template representation as an object.
Expand Down Expand Up @@ -158,9 +159,10 @@ def delete_instance_template(
) -> None:
"""
Deletes Instance Template.
Deleting an Instance Template is permanent and cannot be undone. It
is not possible to delete templates that are already in use by a managed instance group.
Must be called with keyword arguments rather than positional.
Deleting an Instance Template is permanent and cannot be undone. It is not
possible to delete templates that are already in use by a managed instance
group. Must be called with keyword arguments rather than positional.
:param resource_id: Name of the Compute Engine Instance Template resource.
:param request_id: Unique request_id that you might add to achieve
Expand Down Expand Up @@ -210,6 +212,7 @@ def get_instance_template(
) -> InstanceTemplate:
"""
Retrieves Instance Template by project_id and resource_id.
Must be called with keyword arguments rather than positional.
:param resource_id: Name of the Instance Template.
Expand Down Expand Up @@ -259,6 +262,7 @@ def insert_instance(
) -> None:
"""
Creates Instance using body specified.
Must be called with keyword arguments rather than positional.
:param body: Instance representation as an object. Should at least include 'name', 'machine_type',
Expand Down Expand Up @@ -332,6 +336,7 @@ def get_instance(
) -> Instance:
"""
Retrieves Instance by project_id and resource_id.
Must be called with keyword arguments rather than positional.
:param resource_id: Name of the Instance
Expand Down Expand Up @@ -383,8 +388,8 @@ def delete_instance(
metadata: Sequence[tuple[str, str]] = (),
) -> None:
"""
Deletes Instance.
Deleting an Instance is permanent and cannot be undone.
Permanently and irrevocably deletes an Instance.
It is not possible to delete Instances that are already in use by a managed instance group.
Must be called with keyword arguments rather than positional.
Expand Down Expand Up @@ -433,6 +438,7 @@ def delete_instance(
def start_instance(self, zone: str, resource_id: str, project_id: str) -> None:
"""
Starts an existing instance defined by project_id, zone and resource_id.
Must be called with keyword arguments rather than positional.
:param zone: Google Cloud zone where the instance exists
Expand All @@ -457,7 +463,8 @@ def start_instance(self, zone: str, resource_id: str, project_id: str) -> None:
@GoogleBaseHook.fallback_to_default_project_id
def stop_instance(self, zone: str, resource_id: str, project_id: str) -> None:
"""
Stops an instance defined by project_id, zone and resource_id
Stops an instance defined by project_id, zone and resource_id.
Must be called with keyword arguments rather than positional.
:param zone: Google Cloud zone where the instance exists
Expand All @@ -483,6 +490,7 @@ def stop_instance(self, zone: str, resource_id: str, project_id: str) -> None:
def set_machine_type(self, zone: str, resource_id: str, body: dict, project_id: str) -> None:
"""
Sets machine type of an instance defined by project_id, zone and resource_id.
Must be called with keyword arguments rather than positional.
:param zone: Google Cloud zone where the instance exists.
Expand Down Expand Up @@ -524,6 +532,7 @@ def insert_instance_group_manager(
) -> None:
"""
Creates an Instance Group Managers using the body specified.
After the group is created, instances in the group are created using the specified Instance Template.
Must be called with keyword arguments rather than positional.
Expand Down Expand Up @@ -580,6 +589,7 @@ def get_instance_group_manager(
) -> InstanceGroupManager:
"""
Retrieves Instance Group Manager by project_id, zone and resource_id.
Must be called with keyword arguments rather than positional.
:param resource_id: The name of the Managed Instance Group
Expand Down Expand Up @@ -631,8 +641,8 @@ def delete_instance_group_manager(
metadata: Sequence[tuple[str, str]] = (),
) -> None:
"""
Deletes Instance Group Managers.
Deleting an Instance Group Manager is permanent and cannot be undone.
Permanently and irrevocably deletes Instance Group Managers.
Must be called with keyword arguments rather than positional.
:param resource_id: Name of the Compute Engine Instance Group Managers resource.
Expand Down Expand Up @@ -687,6 +697,7 @@ def patch_instance_group_manager(
) -> None:
"""
Patches Instance Group Manager with the specified body.
Must be called with keyword arguments rather than positional.
:param zone: Google Cloud zone where the Instance Group Manager exists
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,7 @@ def _refresh_jobs(self) -> None:

def _check_dataflow_job_state(self, job) -> bool:
"""
Helper method to check the state of one job in dataflow for this task
if job failed raise exception.
Helper method to check the state of one job in dataflow for this task if job failed raise exception.
:return: True if job is done.
:raise: Exception
Expand Down
6 changes: 2 additions & 4 deletions airflow/providers/google/cloud/hooks/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ def wait_for_pipeline_state(
failure_states: list[str] | None = None,
timeout: int = 5 * 60,
) -> None:
"""
Polls pipeline state and raises an exception if the state is one of
`failure_states` or the operation timed_out.
"""
"""Polls pipeline state and raises an exception if the state fails or times out."""
failure_states = failure_states or FAILURE_STATES
success_states = success_states or SUCCESS_STATES
start_time = monotonic()
Expand Down Expand Up @@ -190,6 +187,7 @@ def get_conn(self) -> Resource:
def restart_instance(self, instance_name: str, location: str, project_id: str) -> Operation:
"""
Restart a single Data Fusion instance.
At the end of an operation instance is fully restarted.
:param instance_name: The name of the instance to restart.
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/hooks/dataprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def get_jobs_for_job_group(self, job_id: int) -> dict[str, Any]:
def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> dict[str, Any]:
"""
Get the specified job group.
A job group is a job that is executed from a specific node in a flow.
:param job_group_id: The ID of the job that will be fetched
Expand Down

0 comments on commit 0f73647

Please sign in to comment.