Skip to content

Commit

Permalink
Add defer mode to GKECreateClusterOperator and GKEDeleteClusterOperat…
Browse files Browse the repository at this point in the history
…or (#28406)

* Add defer mode to GKECreateClusterOperator and GKEDeleteClusterOperator
  • Loading branch information
MrGeorgeOwl committed Feb 14, 2023
1 parent 47edfe9 commit 28126c1
Show file tree
Hide file tree
Showing 8 changed files with 767 additions and 51 deletions.
111 changes: 83 additions & 28 deletions airflow/providers/google/cloud/hooks/kubernetes_engine.py
Expand Up @@ -30,19 +30,23 @@
import warnings
from typing import Sequence

from google.api_core.exceptions import AlreadyExists, NotFound
from google.api_core.exceptions import NotFound
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry

# not sure why but mypy complains on missing `container_v1` but it is clearly there and is importable
from google.cloud import container_v1, exceptions # type: ignore[attr-defined]
from google.cloud.container_v1 import ClusterManagerClient
from google.cloud.container_v1 import ClusterManagerAsyncClient, ClusterManagerClient
from google.cloud.container_v1.types import Cluster, Operation

from airflow import version
from airflow.exceptions import AirflowException
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook
from airflow.providers.google.common.hooks.base_google import (
PROVIDE_PROJECT_ID,
GoogleBaseAsyncHook,
GoogleBaseHook,
)

OPERATIONAL_POLL_INTERVAL = 15

Expand Down Expand Up @@ -156,9 +160,10 @@ def delete_cluster(
self,
name: str,
project_id: str = PROVIDE_PROJECT_ID,
wait_to_complete: bool = True,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
) -> str | None:
) -> Operation | None:
"""
Deletes the cluster, including the Kubernetes endpoint and all
worker nodes. Firewalls and routes that were configured during
Expand All @@ -169,6 +174,8 @@ def delete_cluster(
:param name: The name of the cluster to delete
:param project_id: Google Cloud project ID
:param wait_to_complete: A boolean value which makes method to sleep while
operation of deletion is not finished.
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:param timeout: The amount of time, in seconds, to wait for the request to
Expand All @@ -179,26 +186,28 @@ def delete_cluster(
self.log.info("Deleting (project_id=%s, location=%s, cluster_id=%s)", project_id, self.location, name)

try:
resource = self.get_cluster_manager_client().delete_cluster(
operation = self.get_cluster_manager_client().delete_cluster(
name=f"projects/{project_id}/locations/{self.location}/clusters/{name}",
retry=retry,
timeout=timeout,
)
resource = self.wait_for_operation(resource, project_id)
if wait_to_complete:
operation = self.wait_for_operation(operation, project_id)
# Returns server-defined url for the resource
return resource.self_link
return operation
except NotFound as error:
self.log.info("Assuming Success: %s", error.message)
return None

@GoogleBaseHook.fallback_to_default_project_id
def create_cluster(
self,
cluster: dict | Cluster | None,
cluster: dict | Cluster,
project_id: str = PROVIDE_PROJECT_ID,
wait_to_complete: bool = True,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
) -> str:
) -> Operation | Cluster:
"""
Creates a cluster, consisting of the specified number and type of Google Compute
Engine instances.
Expand All @@ -207,6 +216,8 @@ def create_cluster(
be of the same form as the protobuf message
:class:`google.cloud.container_v1.types.Cluster`
:param project_id: Google Cloud project ID
:param wait_to_complete: A boolean value which makes method to sleep while
operation of creation is not finished.
:param retry: A retry object (``google.api_core.retry.Retry``) used to
retry requests.
If None is specified, requests will not be retried.
Expand All @@ -231,19 +242,17 @@ def create_cluster(
self.location,
cluster.name, # type: ignore
)
try:
resource = self.get_cluster_manager_client().create_cluster(
parent=f"projects/{project_id}/locations/{self.location}",
cluster=cluster, # type: ignore
retry=retry,
timeout=timeout,
)
resource = self.wait_for_operation(resource, project_id)
operation = self.get_cluster_manager_client().create_cluster(
parent=f"projects/{project_id}/locations/{self.location}",
cluster=cluster, # type: ignore
retry=retry,
timeout=timeout,
)

return resource.target_link
except AlreadyExists as error:
self.log.info("Assuming Success: %s", error.message)
return self.get_cluster(name=cluster.name, project_id=project_id) # type: ignore
if wait_to_complete:
operation = self.wait_for_operation(operation, project_id)

return operation

@GoogleBaseHook.fallback_to_default_project_id
def get_cluster(
Expand Down Expand Up @@ -272,12 +281,58 @@ def get_cluster(
name,
)

return (
self.get_cluster_manager_client()
.get_cluster(
name=f"projects/{project_id}/locations/{self.location}/clusters/{name}",
retry=retry,
timeout=timeout,
return self.get_cluster_manager_client().get_cluster(
name=f"projects/{project_id}/locations/{self.location}/clusters/{name}",
retry=retry,
timeout=timeout,
)


class AsyncGKEHook(GoogleBaseAsyncHook):
"""Hook implemented with usage of asynchronous client of GKE."""

sync_hook_class = GKEHook

def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
self._client: ClusterManagerAsyncClient | None = None
self.location = location

async def _get_client(self) -> ClusterManagerAsyncClient:
if self._client is None:
self._client = ClusterManagerAsyncClient(
credentials=(await self.get_sync_hook()).get_credentials(),
client_info=CLIENT_INFO,
)
.self_link
return self._client

@GoogleBaseHook.fallback_to_default_project_id
async def get_operation(
self,
operation_name: str,
project_id: str = PROVIDE_PROJECT_ID,
) -> Operation:
"""
Fetches the operation from Google Cloud.
:param operation_name: Name of operation to fetch.
:param project_id: Google Cloud project ID.
:return: The new, updated operation from Google Cloud.
"""
project_id = project_id or (await self.get_sync_hook()).project_id

operation_path = f"projects/{project_id}/locations/{self.location}/operations/{operation_name}"
client = await self._get_client()
return await client.get_operation(
name=operation_path,
)

0 comments on commit 28126c1

Please sign in to comment.