Skip to content

Commit

Permalink
Add DataflowJobMessagesSensor and DataflowAutoscalingEventsSensor (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobiasz Kędzierski committed Nov 27, 2020
1 parent c084393 commit e1ebfa6
Show file tree
Hide file tree
Showing 5 changed files with 626 additions and 4 deletions.
37 changes: 36 additions & 1 deletion airflow/providers/google/cloud/example_dags/example_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@
DataflowCreatePythonJobOperator,
DataflowTemplatedJobStartOperator,
)
from airflow.providers.google.cloud.sensors.dataflow import DataflowJobMetricsSensor, DataflowJobStatusSensor
from airflow.providers.google.cloud.sensors.dataflow import (
DataflowJobAutoScalingEventsSensor,
DataflowJobMessagesSensor,
DataflowJobMetricsSensor,
DataflowJobStatusSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.utils.dates import days_ago

Expand Down Expand Up @@ -183,8 +188,38 @@ def callback(metrics: List[Dict]) -> bool:
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
)

def check_message(messages: List[dict]) -> bool:
"""Check message"""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False

wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait-for-python-job-async-message",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
location='europe-west3',
callback=check_message,
)

def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
"""Check autoscaling event"""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False

wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
task_id="wait-for-python-job-async-autoscaling-event",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
location='europe-west3',
callback=check_autoscaling_event,
)

start_python_job_async >> wait_for_python_job_async_done
start_python_job_async >> wait_for_python_job_async_metric
start_python_job_async >> wait_for_python_job_async_message
start_python_job_async >> wait_for_python_job_async_autoscaling_event


with models.DAG(
Expand Down
118 changes: 117 additions & 1 deletion airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import warnings
from copy import deepcopy
from tempfile import TemporaryDirectory
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, TypeVar, Union, cast
from typing import Any, Callable, Dict, Generator, List, Optional, Sequence, Set, TypeVar, Union, cast

from googleapiclient.discovery import build

Expand Down Expand Up @@ -264,6 +264,66 @@ def fetch_job_metrics_by_id(self, job_id: str) -> dict:
self.log.debug("fetch_job_metrics_by_id %s:\n%s", job_id, result)
return result

def _fetch_list_job_messages_responses(self, job_id: str) -> Generator[dict, None, None]:
"""
Helper method to fetch ListJobMessagesResponse with the specified Job ID.
:param job_id: Job ID to get.
:type job_id: str
:return: yields the ListJobMessagesResponse. See:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse
:rtype: Generator[dict, None, None]
"""
request = (
self._dataflow.projects()
.locations()
.jobs()
.messages()
.list(projectId=self._project_number, location=self._job_location, jobId=job_id)
)

while request is not None:
response = request.execute(num_retries=self._num_retries)
yield response

request = (
self._dataflow.projects()
.locations()
.jobs()
.messages()
.list_next(previous_request=request, previous_response=response)
)

def fetch_job_messages_by_id(self, job_id: str) -> List[dict]:
"""
Helper method to fetch the job messages with the specified Job ID.
:param job_id: Job ID to get.
:type job_id: str
:return: the list of JobMessages. See:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage
:rtype: List[dict]
"""
messages: List[dict] = []
for response in self._fetch_list_job_messages_responses(job_id=job_id):
messages.extend(response.get("jobMessages", []))
return messages

def fetch_job_autoscaling_events_by_id(self, job_id: str) -> List[dict]:
"""
Helper method to fetch the job autoscaling events with the specified Job ID.
:param job_id: Job ID to get.
:type job_id: str
:return: the list of AutoscalingEvents. See:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent
:rtype: List[dict]
"""
autoscaling_events: List[dict] = []
for response in self._fetch_list_job_messages_responses(job_id=job_id):
autoscaling_events.extend(response.get("autoscalingEvents", []))
return autoscaling_events

def _fetch_all_jobs(self) -> List[dict]:
request = (
self._dataflow.projects()
Expand Down Expand Up @@ -1150,3 +1210,59 @@ def fetch_job_metrics_by_id(
location=location,
)
return jobs_controller.fetch_job_metrics_by_id(job_id)

@GoogleBaseHook.fallback_to_default_project_id
def fetch_job_messages_by_id(
self,
job_id: str,
project_id: str,
location: str = DEFAULT_DATAFLOW_LOCATION,
) -> List[dict]:
"""
Gets the job messages with the specified Job ID.
:param job_id: Job ID to get.
:type job_id: str
:param project_id: Optional, the Google Cloud project ID in which to start a job.
If set to None or missing, the default project_id from the Google Cloud connection is used.
:type project_id:
:param location: Job location.
:type location: str
:return: the list of JobMessages. See:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage
:rtype: List[dict]
"""
jobs_controller = _DataflowJobsController(
dataflow=self.get_conn(),
project_number=project_id,
location=location,
)
return jobs_controller.fetch_job_messages_by_id(job_id)

@GoogleBaseHook.fallback_to_default_project_id
def fetch_job_autoscaling_events_by_id(
self,
job_id: str,
project_id: str,
location: str = DEFAULT_DATAFLOW_LOCATION,
) -> List[dict]:
"""
Gets the job autoscaling events with the specified Job ID.
:param job_id: Job ID to get.
:type job_id: str
:param project_id: Optional, the Google Cloud project ID in which to start a job.
If set to None or missing, the default project_id from the Google Cloud connection is used.
:type project_id:
:param location: Job location.
:type location: str
:return: the list of AutoscalingEvents. See:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent
:rtype: List[dict]
"""
jobs_controller = _DataflowJobsController(
dataflow=self.get_conn(),
project_number=project_id,
location=location,
)
return jobs_controller.fetch_job_autoscaling_events_by_id(job_id)

0 comments on commit e1ebfa6

Please sign in to comment.