Skip to content

Commit

Permalink
Add extra links for google dataproc (#10343)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesemsanthoshkumar committed May 7, 2021
1 parent b9f6e9c commit b8c0fde
Show file tree
Hide file tree
Showing 6 changed files with 582 additions and 55 deletions.
104 changes: 103 additions & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,57 @@
from google.protobuf.field_mask_pb2 import FieldMask

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.models.taskinstance import TaskInstance
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, DataProcJobBuilder
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults

DATAPROC_BASE_LINK = "https://console.cloud.google.com/dataproc"
DATAPROC_JOB_LOG_LINK = DATAPROC_BASE_LINK + "/jobs/{job_id}?region={region}&project={project_id}"
DATAPROC_CLUSTER_LINK = (
DATAPROC_BASE_LINK + "/clusters/{cluster_name}/monitoring?region={region}&project={project_id}"
)


class DataprocJobLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Job link"""

name = "Dataproc Job"

def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
job_conf = ti.xcom_pull(task_ids=operator.task_id, key="job_conf")
return (
DATAPROC_JOB_LOG_LINK.format(
job_id=job_conf["job_id"],
region=job_conf["region"],
project_id=job_conf["project_id"],
)
if job_conf
else ""
)


class DataprocClusterLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Cluster link"""

name = "Dataproc Cluster"

def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
cluster_conf = ti.xcom_pull(task_ids=operator.task_id, key="cluster_conf")
return (
DATAPROC_CLUSTER_LINK.format(
cluster_name=cluster_conf["cluster_name"],
region=cluster_conf["region"],
project_id=cluster_conf["project_id"],
)
if cluster_conf
else ""
)


# pylint: disable=too-many-instance-attributes
class ClusterGenerator:
Expand Down Expand Up @@ -478,6 +523,8 @@ class DataprocCreateClusterOperator(BaseOperator):
)
template_fields_renderers = {'cluster_config': 'json'}

operator_extra_links = (DataprocClusterLink(),)

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
Expand Down Expand Up @@ -620,6 +667,16 @@ def _wait_for_cluster_in_creating_state(self, hook: DataprocHook) -> Cluster:
def execute(self, context) -> dict:
self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Save data required to display extra link no matter what the cluster status will be
self.xcom_push(
context,
key="cluster_conf",
value={
"cluster_name": self.cluster_name,
"region": self.region,
"project_id": self.project_id,
},
)
try:
# First try to create a new cluster
cluster = self._create_cluster(hook)
Expand Down Expand Up @@ -694,6 +751,8 @@ class DataprocScaleClusterOperator(BaseOperator):

template_fields = ['cluster_name', 'project_id', 'region', 'impersonation_chain']

operator_extra_links = (DataprocClusterLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -773,6 +832,16 @@ def execute(self, context) -> None:
update_mask = ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]

hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Save data required to display extra link no matter what the cluster status will be
self.xcom_push(
context,
key="cluster_conf",
value={
"cluster_name": self.cluster_name,
"region": self.region,
"project_id": self.project_id,
},
)
operation = hook.update_cluster(
project_id=self.project_id,
location=self.region,
Expand Down Expand Up @@ -931,6 +1000,8 @@ class DataprocJobBaseOperator(BaseOperator):

job_type = ""

operator_extra_links = (DataprocJobLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -1005,6 +1076,12 @@ def execute(self, context):
)
job_id = job_object.reference.job_id
self.log.info('Job %s submitted successfully.', job_id)
# Save data required for extra links no matter what the job status will be
self.xcom_push(
context,
key='job_conf',
value={'job_id': job_id, 'region': self.region, 'project_id': self.project_id},
)

if not self.asynchronous:
self.log.info('Waiting for job %s to complete', job_id)
Expand Down Expand Up @@ -1082,6 +1159,8 @@ class DataprocSubmitPigJobOperator(DataprocJobBaseOperator):
ui_color = '#0273d4'
job_type = 'pig_job'

operator_extra_links = (DataprocJobLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -1871,6 +1950,8 @@ class DataprocSubmitJobOperator(BaseOperator):
template_fields = ('project_id', 'location', 'job', 'impersonation_chain', 'request_id')
template_fields_renderers = {"job": "json"}

operator_extra_links = (DataprocJobLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -1919,6 +2000,16 @@ def execute(self, context: Dict):
)
job_id = job_object.reference.job_id
self.log.info('Job %s submitted successfully.', job_id)
# Save data required by extra links no matter what the job status will be
self.xcom_push(
context,
key="job_conf",
value={
"job_id": job_id,
"region": self.location,
"project_id": self.project_id,
},
)

if not self.asynchronous:
self.log.info('Waiting for job %s to complete', job_id)
Expand Down Expand Up @@ -1988,6 +2079,7 @@ class DataprocUpdateClusterOperator(BaseOperator):
"""

template_fields = ('impersonation_chain', 'cluster_name')
operator_extra_links = (DataprocClusterLink(),)

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
Expand Down Expand Up @@ -2023,6 +2115,16 @@ def __init__( # pylint: disable=too-many-arguments

def execute(self, context: Dict):
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Save data required by extra links no matter what the cluster status will be
self.xcom_push(
context,
key="cluster_conf",
value={
"cluster_name": self.cluster_name,
"region": self.location,
"project_id": self.project_id,
},
)
self.log.info("Updating %s cluster.", self.cluster_name)
operation = hook.update_cluster(
project_id=self.project_id,
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,8 @@ extra-links:
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
- airflow.providers.google.cloud.operators.dataproc.DataprocJobLink
- airflow.providers.google.cloud.operators.dataproc.DataprocClusterLink

additional-extras:
apache.beam: apache-beam[gcp]
9 changes: 9 additions & 0 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
}

BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
"airflow.providers.google.cloud.operators.dataproc.DataprocJobLink",
"airflow.providers.google.cloud.operators.dataproc.DataprocClusterLink",
"airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink",
"airflow.providers.qubole.operators.qubole.QDSLink",
]


@cache
def get_operator_extra_links():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ function discover_all_extra_links() {
group_start "Listing available extra links via 'airflow providers links'"
COLUMNS=180 airflow providers links

local expected_number_of_extra_links=4
local expected_number_of_extra_links=6
local actual_number_of_extra_links
actual_number_of_extra_links=$(airflow providers links --output table | grep -c ^airflow.providers | xargs)
if [[ ${actual_number_of_extra_links} != "${expected_number_of_extra_links}" ]]; then
Expand Down
2 changes: 2 additions & 0 deletions tests/core/test_providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@
EXTRA_LINKS = [
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink',
'airflow.providers.google.cloud.operators.dataproc.DataprocClusterLink',
'airflow.providers.google.cloud.operators.dataproc.DataprocJobLink',
'airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink',
'airflow.providers.qubole.operators.qubole.QDSLink',
]
Expand Down

0 comments on commit b8c0fde

Please sign in to comment.