Skip to content

Commit

Permalink
Fix location on cloud build operators (#29937)
Browse files Browse the repository at this point in the history
  • Loading branch information
tnk-ysk committed Mar 10, 2023
1 parent e60be9e commit def1f89
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 70 deletions.
89 changes: 58 additions & 31 deletions airflow/providers/google/cloud/hooks/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import warnings
from typing import Sequence

from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import AlreadyExists
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.operation import Operation
Expand Down Expand Up @@ -67,7 +68,7 @@ def __init__(
super().__init__(
gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain
)
self._client: CloudBuildClient | None = None
self._client: dict[str, CloudBuildClient] = {}

def _get_build_id_from_operation(self, operation: Operation) -> str:
"""
Expand All @@ -91,15 +92,24 @@ def wait_for_operation(self, operation: Operation, timeout: float | None = None)
error = operation.exception(timeout=timeout)
raise AirflowException(error)

def get_conn(self) -> CloudBuildClient:
def get_conn(self, location: str = "global") -> CloudBuildClient:
"""
Retrieves the connection to Google Cloud Build.
:param location: The location of the project.
:return: Google Cloud Build client object.
"""
if not self._client:
self._client = CloudBuildClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)
return self._client
if location not in self._client:
client_options = None
if location != "global":
client_options = ClientOptions(api_endpoint=f"{location}-cloudbuild.googleapis.com:443")
self._client[location] = CloudBuildClient(
credentials=self.get_credentials(),
client_info=CLIENT_INFO,
client_options=client_options,
)
return self._client[location]

@GoogleBaseHook.fallback_to_default_project_id
def cancel_build(
Expand All @@ -109,6 +119,7 @@ def cancel_build(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> Build:
"""
Cancels a build in progress.
Expand All @@ -121,9 +132,9 @@ def cancel_build(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start cancelling build: %s.", id_)

Expand All @@ -145,6 +156,7 @@ def create_build_without_waiting_for_result(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> tuple[Operation, str]:
"""
Starts a build with the specified configuration without waiting for it to finish.
Expand All @@ -158,13 +170,16 @@ def create_build_without_waiting_for_result(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

parent = f"projects/{project_id}/locations/{location}"

self.log.info("Start creating build...")

operation = client.create_build(
request={"project_id": project_id, "build": build},
request={"parent": parent, "project_id": project_id, "build": build},
retry=retry,
timeout=timeout,
metadata=metadata,
Expand Down Expand Up @@ -227,6 +242,7 @@ def create_build_trigger(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> BuildTrigger:
"""
Creates a new BuildTrigger.
Expand All @@ -240,9 +256,9 @@ def create_build_trigger(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start creating build trigger...")

Expand All @@ -268,6 +284,7 @@ def delete_build_trigger(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> None:
"""
Deletes a BuildTrigger by its project ID and trigger ID.
Expand All @@ -280,8 +297,9 @@ def delete_build_trigger(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start deleting build trigger: %s.", trigger_id)

Expand All @@ -302,6 +320,7 @@ def get_build(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> Build:
"""
Returns information about a previously requested build.
Expand All @@ -314,9 +333,9 @@ def get_build(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start retrieving build: %s.", id_)

Expand All @@ -339,6 +358,7 @@ def get_build_trigger(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> BuildTrigger:
"""
Returns information about a BuildTrigger.
Expand All @@ -351,9 +371,9 @@ def get_build_trigger(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start retrieving build trigger: %s.", trigger_id)

Expand All @@ -371,7 +391,7 @@ def get_build_trigger(
@GoogleBaseHook.fallback_to_default_project_id
def list_build_triggers(
self,
location: str,
location: str = "global",
project_id: str = PROVIDE_PROJECT_ID,
page_size: int | None = None,
page_token: str | None = None,
Expand All @@ -394,7 +414,7 @@ def list_build_triggers(
:param metadata: Optional, additional metadata that is provided to the method.
"""
client = self.get_conn()
client = self.get_conn(location=location)

parent = f"projects/{project_id}/locations/{location}"

Expand All @@ -419,7 +439,7 @@ def list_build_triggers(
@GoogleBaseHook.fallback_to_default_project_id
def list_builds(
self,
location: str,
location: str = "global",
project_id: str = PROVIDE_PROJECT_ID,
page_size: int | None = None,
page_token: int | None = None,
Expand All @@ -444,7 +464,7 @@ def list_builds(
:param metadata: Optional, additional metadata that is provided to the method.
"""
client = self.get_conn()
client = self.get_conn(location=location)

parent = f"projects/{project_id}/locations/{location}"

Expand Down Expand Up @@ -476,6 +496,7 @@ def retry_build(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> Build:
"""
Creates a new build based on the specified build. This method creates a new build
Expand All @@ -490,9 +511,9 @@ def retry_build(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start retrying build: %s.", id_)

Expand All @@ -506,13 +527,13 @@ def retry_build(
id_ = self._get_build_id_from_operation(operation)

if not wait:
return self.get_build(id_=id_, project_id=project_id)
return self.get_build(id_=id_, project_id=project_id, location=location)

operation.result()

self.log.info("Build has been retried: %s.", id_)

return self.get_build(id_=id_, project_id=project_id)
return self.get_build(id_=id_, project_id=project_id, location=location)

@GoogleBaseHook.fallback_to_default_project_id
def run_build_trigger(
Expand All @@ -524,6 +545,7 @@ def run_build_trigger(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> Build:
"""
Runs a BuildTrigger at a particular source revision.
Expand All @@ -539,9 +561,9 @@ def run_build_trigger(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start running build trigger: %s.", trigger_id)
operation = client.run_build_trigger(
Expand All @@ -554,12 +576,12 @@ def run_build_trigger(
id_ = self._get_build_id_from_operation(operation)

if not wait:
return self.get_build(id_=id_, project_id=project_id)
return self.get_build(id_=id_, project_id=project_id, location=location)
operation.result()

self.log.info("Build trigger has been run: %s.", trigger_id)

return self.get_build(id_=id_, project_id=project_id)
return self.get_build(id_=id_, project_id=project_id, location=location)

@GoogleBaseHook.fallback_to_default_project_id
def update_build_trigger(
Expand All @@ -570,6 +592,7 @@ def update_build_trigger(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> BuildTrigger:
"""
Updates a BuildTrigger by its project ID and trigger ID.
Expand All @@ -584,9 +607,9 @@ def update_build_trigger(
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
:param location: The location of the project.
"""
client = self.get_conn()
client = self.get_conn(location=location)

self.log.info("Start updating build trigger: %s.", trigger_id)

Expand All @@ -613,12 +636,16 @@ async def get_cloud_build(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
) -> Build:
"""Retrieves a Cloud Build with a specified id."""
if not id_:
raise AirflowException("Google Cloud Build id is required.")

client = CloudBuildAsyncClient()
client_options = None
if location != "global":
client_options = ClientOptions(api_endpoint=f"{location}-cloudbuild.googleapis.com:443")
client = CloudBuildAsyncClient(client_options=client_options)

request = GetBuildRequest(
project_id=project_id,
Expand Down
18 changes: 14 additions & 4 deletions airflow/providers/google/cloud/links/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@

BUILD_BASE_LINK = "/cloud-build"

BUILD_LINK = BUILD_BASE_LINK + "/builds/{build_id}?project={project_id}"
BUILD_LINK = BUILD_BASE_LINK + "/builds;region={region}/{build_id}?project={project_id}"

BUILD_LIST_LINK = BUILD_BASE_LINK + "/builds?project={project_id}"
BUILD_LIST_LINK = BUILD_BASE_LINK + "/builds;region={region}?project={project_id}"

BUILD_TRIGGERS_LIST_LINK = BUILD_BASE_LINK + "/triggers?project={project_id}"
BUILD_TRIGGERS_LIST_LINK = BUILD_BASE_LINK + "/triggers;region={region}?project={project_id}"

BUILD_TRIGGER_DETAILS_LINK = BUILD_BASE_LINK + "/triggers/edit/{trigger_id}?project={project_id}"
BUILD_TRIGGER_DETAILS_LINK = (
BUILD_BASE_LINK + "/triggers;region={region}/edit/{trigger_id}?project={project_id}"
)


class CloudBuildLink(BaseGoogleLink):
Expand All @@ -47,12 +49,14 @@ def persist(
task_instance,
build_id: str,
project_id: str,
region: str,
):
task_instance.xcom_push(
context=context,
key=CloudBuildLink.key,
value={
"project_id": project_id,
"region": region,
"build_id": build_id,
},
)
Expand All @@ -70,12 +74,14 @@ def persist(
context: Context,
task_instance,
project_id: str,
region: str,
):
task_instance.xcom_push(
context=context,
key=CloudBuildListLink.key,
value={
"project_id": project_id,
"region": region,
},
)

Expand All @@ -92,12 +98,14 @@ def persist(
context: Context,
task_instance,
project_id: str,
region: str,
):
task_instance.xcom_push(
context=context,
key=CloudBuildTriggersListLink.key,
value={
"project_id": project_id,
"region": region,
},
)

Expand All @@ -114,13 +122,15 @@ def persist(
context: Context,
task_instance,
project_id: str,
region: str,
trigger_id: str,
):
task_instance.xcom_push(
context=context,
key=CloudBuildTriggerDetailsLink.key,
value={
"project_id": project_id,
"region": region,
"trigger_id": trigger_id,
},
)

0 comments on commit def1f89

Please sign in to comment.