Skip to content

Commit

Permalink
Add deferrable mode to CloudBuildCreateBuildOperator (#27783)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Dec 3, 2022
1 parent 24745c7 commit c931d88
Show file tree
Hide file tree
Showing 10 changed files with 1,082 additions and 78 deletions.
98 changes: 87 additions & 11 deletions airflow/providers/google/cloud/hooks/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

from typing import Sequence

from google.api_core.exceptions import AlreadyExists
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.cloud.devtools.cloudbuild import CloudBuildClient
from google.cloud.devtools.cloudbuild_v1 import CloudBuildAsyncClient, CloudBuildClient, GetBuildRequest
from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -77,6 +78,14 @@ def _get_build_id_from_operation(self, operation: Operation) -> str:
except Exception:
raise AirflowException("Could not retrieve Build ID from Operation.")

def wait_for_operation(self, operation: Operation, timeout: float | None = None):
"""Waits for long-lasting operation to complete."""
try:
return operation.result(timeout=timeout)
except Exception:
error = operation.exception(timeout=timeout)
raise AirflowException(error)

def get_conn(self) -> CloudBuildClient:
"""
Retrieves the connection to Google Cloud Build.
Expand Down Expand Up @@ -123,6 +132,41 @@ def cancel_build(

return build

@GoogleBaseHook.fallback_to_default_project_id
def create_build_without_waiting_for_result(
self,
build: dict | Build,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> tuple[Operation, str]:
"""
Starts a build with the specified configuration without waiting for it to finish.
:param build: The build resource to create. If a dict is provided, it must be of the same form
as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
:param project_id: Optional, Google Cloud Project project_id where the function belongs.
If set to None or missing, the default project_id from the GCP connection is used.
:param retry: Optional, a retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional, additional metadata that is provided to the method.
"""
client = self.get_conn()

self.log.info("Start creating build...")

operation = client.create_build(
request={"project_id": project_id, "build": build},
retry=retry,
timeout=timeout,
metadata=metadata,
)
id_ = self._get_build_id_from_operation(operation)
return operation, id_

@GoogleBaseHook.fallback_to_default_project_id
def create_build(
self,
Expand Down Expand Up @@ -150,7 +194,7 @@ def create_build(
"""
client = self.get_conn()

self.log.info("Start creating build.")
self.log.info("Start creating build...")

operation = client.create_build(
request={"project_id": project_id, "build": build},
Expand Down Expand Up @@ -195,14 +239,17 @@ def create_build_trigger(
"""
client = self.get_conn()

self.log.info("Start creating build trigger.")
self.log.info("Start creating build trigger...")

trigger = client.create_build_trigger(
request={"project_id": project_id, "trigger": trigger},
retry=retry,
timeout=timeout,
metadata=metadata,
)
try:
trigger = client.create_build_trigger(
request={"project_id": project_id, "trigger": trigger},
retry=retry,
timeout=timeout,
metadata=metadata,
)
except AlreadyExists:
raise AirflowException("Cloud Build Trigger with such parameters already exists.")

self.log.info("Build trigger has been created.")

Expand Down Expand Up @@ -492,7 +539,6 @@ def run_build_trigger(
client = self.get_conn()

self.log.info("Start running build trigger: %s.", trigger_id)

operation = client.run_build_trigger(
request={"project_id": project_id, "trigger_id": trigger_id, "source": source},
retry=retry,
Expand All @@ -504,7 +550,6 @@ def run_build_trigger(

if not wait:
return self.get_build(id_=id_, project_id=project_id)

operation.result()

self.log.info("Build trigger has been run: %s.", trigger_id)
Expand Down Expand Up @@ -550,3 +595,34 @@ def update_build_trigger(
self.log.info("Build trigger has been updated: %s.", trigger_id)

return trigger


class CloudBuildAsyncHook(GoogleBaseHook):
"""Asynchronous Hook for the Google Cloud Build Service."""

@GoogleBaseHook.fallback_to_default_project_id
async def get_cloud_build(
self,
id_: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Build:
"""Retrieves a Cloud Build with a specified id."""
if not id_:
raise AirflowException("Google Cloud Build id is required.")

client = CloudBuildAsyncClient()

request = GetBuildRequest(
project_id=project_id,
id=id_,
)
build_instance = await client.get_build(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return build_instance
90 changes: 72 additions & 18 deletions airflow/providers/google/cloud/operators/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
CloudBuildTriggerDetailsLink,
CloudBuildTriggersListLink,
)
from airflow.providers.google.cloud.triggers.cloud_build import CloudBuildCreateBuildTrigger
from airflow.providers.google.common.consts import GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
from airflow.utils import yaml

if TYPE_CHECKING:
Expand Down Expand Up @@ -147,7 +149,13 @@ class CloudBuildCreateBuildOperator(BaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
:param deferrable: Run operator in the deferrable mode
"""

template_fields: Sequence[str] = ("project_id", "build", "gcp_conn_id", "impersonation_chain")
Expand All @@ -164,19 +172,25 @@ def __init__(
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
delegate_to: str | None = None,
poll_interval: float = 4.0,
deferrable: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.build = build
# Not template fields to keep original value
self.build_raw = build
self.project_id = project_id
self.wait = wait
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.build = build
# Not template fields to keep original value
self.build_raw = build
self.delegate_to = delegate_to
self.poll_interval = poll_interval
self.deferrable = deferrable

def prepare_template(self) -> None:
# if no file is specified, skip
Expand All @@ -189,29 +203,69 @@ def prepare_template(self) -> None:
self.build = json.loads(file.read())

def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

hook = CloudBuildHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
delegate_to=self.delegate_to,
)
build = BuildProcessor(build=self.build).process_body()

result = hook.create_build(
self.cloud_build_operation, self.id_ = hook.create_build_without_waiting_for_result(
build=build,
project_id=self.project_id,
wait=self.wait,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)

self.xcom_push(context, key="id", value=result.id)
project_id = self.project_id or hook.project_id
if project_id:
CloudBuildLink.persist(
context=context,
task_instance=self,
project_id=project_id,
build_id=result.id,
self.xcom_push(context, key="id", value=self.id_)
if not self.wait:
return Build.to_dict(hook.get_build(id_=self.id_, project_id=self.project_id))

if self.deferrable:
self.defer(
trigger=CloudBuildCreateBuildTrigger(
id_=self.id_,
project_id=self.project_id,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
delegate_to=self.delegate_to,
poll_interval=self.poll_interval,
),
method_name=GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME,
)
return Build.to_dict(result)
else:
cloud_build_instance_result = hook.wait_for_operation(
timeout=self.timeout, operation=self.cloud_build_operation
)
project_id = self.project_id or hook.project_id
if project_id:
CloudBuildLink.persist(
context=context,
task_instance=self,
project_id=project_id,
build_id=cloud_build_instance_result.id,
)
return Build.to_dict(cloud_build_instance_result)

def execute_complete(self, context: Context, event: dict):
if event["status"] == "success":
hook = CloudBuildHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
delegate_to=self.delegate_to,
)
self.log.info("Cloud Build completed with response %s ", event["message"])
project_id = self.project_id or hook.project_id
if project_id:
CloudBuildLink.persist(
context=context,
task_instance=self,
project_id=project_id,
build_id=event["id_"],
)
return event["instance"]
else:
raise AirflowException(f"Unexpected error in the operation: {event['message']}")


class CloudBuildCreateBuildTriggerOperator(BaseOperator):
Expand Down

0 comments on commit c931d88

Please sign in to comment.