Skip to content

Commit

Permalink
Wait for pipeline state in Data Fusion operators (#8954)
Browse files Browse the repository at this point in the history
* Wait for pipeline state in Data Fusion operators

fixup! Wait for pipeline state in Data Fusion operators

fixup! fixup! Wait for pipeline state in Data Fusion operators

fixup! fixup! fixup! Wait for pipeline state in Data Fusion operators

* Use quote to encode url parts

* fixup! Use quote to encode url parts
  • Loading branch information
turbaszek committed Jun 15, 2020
1 parent a8cd23c commit aee6ab9
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 68 deletions.
51 changes: 29 additions & 22 deletions airflow/providers/google/cloud/example_dags/example_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,14 @@
LOCATION = "europe-north1"
INSTANCE_NAME = "airflow-test-instance"
INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}

BUCKET1 = "gs://test-bucket--2h83r23r"
BUCKET2 = "gs://test-bucket--2d23h83r23r"
PIPELINE_NAME = "airflow_test"
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.1.1",
"scope": "SYSTEM",
"label": "Data Pipeline - Batch",
},
"description": "",
"name": "esdfsd",
"name": "test-pipe",
"description": "Data Pipeline Application",
"artifact": {"name": "cdap-data-pipeline", "version": "6.1.2", "scope": "SYSTEM"},
"config": {
"resources": {"memoryMB": 2048, "virtualCores": 1},
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
Expand All @@ -62,23 +60,30 @@
"label": "GCS",
"artifact": {
"name": "google-cloud",
"version": "0.13.2",
"version": "0.14.2",
"scope": "SYSTEM",
},
"properties": {
"project": "auto-detect",
"format": "text",
"skipHeader": "false",
"serviceFilePath": "auto-detect",
"filenameOnly": "false",
"recursive": "false",
"encrypted": "false",
"schema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
"referenceName": "dfgdf",
"path": "gs://testawoo",
"path": BUCKET1,
"referenceName": "foo_bucket",
},
},
"outputSchema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
}
],
},
{
"name": "GCS2",
Expand All @@ -88,7 +93,7 @@
"label": "GCS2",
"artifact": {
"name": "google-cloud",
"version": "0.13.2",
"version": "0.14.2",
"scope": "SYSTEM",
},
"properties": {
Expand All @@ -99,12 +104,17 @@
"location": "us",
"schema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
"referenceName": "sdgfdgd",
"path": "gs://testawoo2",
"referenceName": "bar",
"path": BUCKET2,
},
},
"outputSchema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
}
],
"inputSchema": [
{
"name": "GCS",
Expand Down Expand Up @@ -210,10 +220,7 @@
# [END howto_cloud_data_fusion_delete_instance_operator]

# Add sleep before creating pipeline
sleep = BashOperator(
task_id="sleep",
bash_command="sleep 60"
)
sleep = BashOperator(task_id="sleep", bash_command="sleep 60")

create_instance >> get_instance >> restart_instance >> update_instance >> sleep
sleep >> create_pipeline >> list_pipelines >> start_pipeline >> stop_pipeline >> delete_pipeline
Expand Down
162 changes: 131 additions & 31 deletions airflow/providers/google/cloud/hooks/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
"""
import json
import os
from time import sleep
from typing import Any, Dict, Optional
from urllib.parse import urlencode
from time import monotonic, sleep
from typing import Any, Dict, List, Optional, Union
from urllib.parse import quote, urlencode

import google.auth
from google.api_core.retry import exponential_sleep_generator
from googleapiclient.discovery import Resource, build

from airflow.exceptions import AirflowException
Expand All @@ -33,6 +34,24 @@
Operation = Dict[str, Any]


class PipelineStates:
"""Data Fusion pipeline states"""

PENDING = "PENDING"
STARTING = "STARTING"
RUNNING = "RUNNING"
SUSPENDED = "SUSPENDED"
RESUMING = "RESUMING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
KILLED = "KILLED"
REJECTED = "REJECTED"


FAILURE_STATES = [PipelineStates.FAILED, PipelineStates.KILLED, PipelineStates.REJECTED]
SUCCESS_STATES = [PipelineStates.COMPLETED]


class DataFusionHook(GoogleBaseHook):
"""
Hook for Google DataFusion.
Expand All @@ -53,8 +72,8 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
"""
Waits for long-lasting operation to complete.
"""
while not operation.get("done"):
sleep(30)
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
sleep(time_to_wait)
operation = (
self.get_conn() # pylint: disable=no-member
.projects()
Expand All @@ -63,10 +82,53 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
.get(name=operation.get("name"))
.execute()
)
if operation.get("done"):
break
if "error" in operation:
raise AirflowException(operation["error"])
return operation["response"]

def wait_for_pipeline_state(
self,
pipeline_name: str,
pipeline_id: str,
instance_url: str,
namespace: str = "default",
success_states: Optional[List[str]] = None,
failure_states: Optional[List[str]] = None,
timeout: int = 5 * 60,
):
"""
Polls pipeline state and raises an exception if the state is one of
`failure_states` or the operation timeouted.
"""
failure_states = failure_states or FAILURE_STATES
success_states = success_states or SUCCESS_STATES
start_time = monotonic()
current_state = None
while monotonic() - start_time < timeout:
current_state = self._get_workflow_state(
pipeline_name=pipeline_name,
pipeline_id=pipeline_id,
instance_url=instance_url,
namespace=namespace,
)

if current_state in success_states:
return
if current_state in failure_states:
raise AirflowException(
f"Pipeline {pipeline_name} state {current_state} is not "
f"one of {success_states}"
)
sleep(30)

# Time is up!
raise AirflowException(
f"Pipeline {pipeline_name} state {current_state} is not "
f"one of {success_states} after {timeout}s"
)

@staticmethod
def _name(project_id: str, location: str, instance_name: str) -> str:
return f"projects/{project_id}/locations/{location}/instances/{instance_name}"
Expand All @@ -75,8 +137,14 @@ def _name(project_id: str, location: str, instance_name: str) -> str:
def _parent(project_id: str, location: str) -> str:
return f"projects/{project_id}/locations/{location}"

@staticmethod
def _base_url(instance_url: str, namespace: str) -> str:
return os.path.join(
instance_url, "v3", "namespaces", quote(namespace), "apps"
)

def _cdap_request(
self, url: str, method: str, body: Optional[Dict[str, Any]] = None
self, url: str, method: str, body: Optional[Union[List, Dict]] = None
) -> google.auth.transport.Response:
headers: Dict[str, str] = {"Content-Type": "application/json"}
request = google.auth.transport.requests.Request()
Expand Down Expand Up @@ -273,14 +341,12 @@ def create_pipeline(
:type pipeline: Dict[str, Any]
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
:type instance_url: str
:param namespace: f your pipeline belongs to a Basic edition instance, the namespace ID
:param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise edition instance, you
can create a namespace.
:type namespace: str
"""
url = os.path.join(
instance_url, "v3", "namespaces", namespace, "apps", pipeline_name
)
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
response = self._cdap_request(url=url, method="PUT", body=pipeline)
if response.status != 200:
raise AirflowException(
Expand Down Expand Up @@ -308,9 +374,7 @@ def delete_pipeline(
can create a namespace.
:type namespace: str
"""
url = os.path.join(
instance_url, "v3", "namespaces", namespace, "apps", pipeline_name
)
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
if version_id:
url = os.path.join(url, "versions", version_id)

Expand Down Expand Up @@ -341,7 +405,7 @@ def list_pipelines(
can create a namespace.
:type namespace: str
"""
url = os.path.join(instance_url, "v3", "namespaces", namespace, "apps")
url = self._base_url(instance_url, namespace)
query: Dict[str, str] = {}
if artifact_name:
query = {"artifactName": artifact_name}
Expand All @@ -357,12 +421,36 @@ def list_pipelines(
)
return json.loads(response.data)

def _get_workflow_state(
self,
pipeline_name: str,
instance_url: str,
pipeline_id: str,
namespace: str = "default",
) -> str:
url = os.path.join(
self._base_url(instance_url, namespace),
quote(pipeline_name),
"workflows",
"DataPipelineWorkflow",
"runs",
quote(pipeline_id),
)
response = self._cdap_request(url=url, method="GET")
if response.status != 200:
raise AirflowException(
f"Retrieving a pipeline state failed with code {response.status}"
)
workflow = json.loads(response.data)
return workflow["status"]

def start_pipeline(
self, pipeline_name: str,
self,
pipeline_name: str,
instance_url: str,
namespace: str = "default",
runtime_args: Optional[Dict[str, Any]] = None
) -> None:
runtime_args: Optional[Dict[str, Any]] = None,
) -> str:
"""
Starts a Cloud Data Fusion pipeline. Works for both batch and stream pipelines.
Expand All @@ -377,24 +465,40 @@ def start_pipeline(
can create a namespace.
:type namespace: str
"""
# TODO: This API endpoint starts multiple pipelines. There will eventually be a fix
# return the run Id as part of the API request to run a single pipeline.
# https://github.com/apache/airflow/pull/8954#discussion_r438223116
url = os.path.join(
instance_url,
"v3",
"namespaces",
namespace,
"apps",
pipeline_name,
"workflows",
"DataPipelineWorkflow",
"start"
quote(namespace),
"start",
)

response = self._cdap_request(url=url, method="POST", body=runtime_args)
runtime_args = runtime_args or {}
body = [{
"appId": pipeline_name,
"programType": "workflow",
"programId": "DataPipelineWorkflow",
"runtimeargs": runtime_args
}]
response = self._cdap_request(url=url, method="POST", body=body)
if response.status != 200:
raise AirflowException(
f"Starting a pipeline failed with code {response.status}"
)

response_json = json.loads(response.data)
pipeline_id = response_json[0]["runId"]
self.wait_for_pipeline_state(
success_states=SUCCESS_STATES + [PipelineStates.RUNNING],
pipeline_name=pipeline_name,
pipeline_id=pipeline_id,
namespace=namespace,
instance_url=instance_url,
)
return pipeline_id

def stop_pipeline(
self, pipeline_name: str, instance_url: str, namespace: str = "default"
) -> None:
Expand All @@ -411,12 +515,8 @@ def stop_pipeline(
:type namespace: str
"""
url = os.path.join(
instance_url,
"v3",
"namespaces",
namespace,
"apps",
pipeline_name,
self._base_url(instance_url, namespace),
quote(pipeline_name),
"workflows",
"DataPipelineWorkflow",
"stop",
Expand Down

0 comments on commit aee6ab9

Please sign in to comment.