Skip to content

Commit

Permalink
[AIRFLOW-7106] Cloud data fusion integration - Allow to pass args to …
Browse files Browse the repository at this point in the history
…start pipeline (#7849)

* Add the possibility of passing args to data fusion start pipeline

* [AIRFLOW-7106] Cloud data fusion integration - Allow to pass args to start pipeline

* [AIRFLOW-7106] modified arguments type from string to dict, fixed tests

* [AIRFLOW-7106] reverted wrong comment on tests

* Update airflow/providers/google/cloud/hooks/datafusion.py

Co-Authored-By: Tomek Urbaszek <[email protected]>

* [AIRFLOW-7106] fixed static checks

* reverting conftest.py

* removed trailing whitespaces

* added hook test for start pipeline with parameters

* fixed test on google data fusion hook

* fixed all pre-commit tests

Co-authored-by: Davide Malagoli <[email protected]>
Co-authored-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
3 people committed Apr 6, 2020
1 parent 9fda018 commit 3fc89f2
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
13 changes: 10 additions & 3 deletions airflow/providers/google/cloud/hooks/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def _cdap_request(
)

payload = json.dumps(body) if body else None

response = request(method=method, url=url, headers=headers, body=payload)
return response

Expand Down Expand Up @@ -372,7 +373,10 @@ def list_pipelines(
return json.loads(response.data)

def start_pipeline(
self, pipeline_name: str, instance_url: str, namespace: str = "default"
self, pipeline_name: str,
instance_url: str,
namespace: str = "default",
runtime_args: Optional[Dict[str, Any]] = None
) -> None:
"""
Starts a Cloud Data Fusion pipeline. Works for both batch and stream pipelines.
Expand All @@ -381,6 +385,8 @@ def start_pipeline(
:type pipeline_name: str
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
:type instance_url: str
:param runtime_args: Optional runtime JSON args to be passed to the pipeline
:type runtime_args: Optional[Dict[str, Any]]
:param namespace: f your pipeline belongs to a Basic edition instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise edition instance, you
can create a namespace.
Expand All @@ -395,9 +401,10 @@ def start_pipeline(
pipeline_name,
"workflows",
"DataPipelineWorkflow",
"start",
"start"
)
response = self._cdap_request(url=url, method="POST")

response = self._cdap_request(url=url, method="POST", body=runtime_args)
if response.status != 200:
raise AirflowException(
f"Starting a pipeline failed with code {response.status}"
Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/google/cloud/operators/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
:type instance_name: str
:param location: The Cloud Data Fusion location in which to handle the request.
:type location: str
:param runtime_args: Optional runtime args to be passed to the pipeline
:type runtime_args: dict
:param namespace: If your pipeline belongs to a Basic edition instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise edition instance, you
can create a namespace.
Expand All @@ -631,14 +633,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
:type delegate_to: str
"""

template_fields = ("instance_name", "pipeline_name")
template_fields = ("instance_name", "pipeline_name", "runtime_args")

@apply_defaults
def __init__(
self,
pipeline_name: str,
instance_name: str,
location: str,
runtime_args: Optional[Dict[str, Any]] = None,
namespace: str = "default",
project_id: Optional[str] = None,
api_version: str = "v1beta1",
Expand All @@ -649,6 +652,7 @@ def __init__(
) -> None:
super().__init__(*args, **kwargs)
self.pipeline_name = pipeline_name
self.runtime_args = runtime_args
self.namespace = namespace
self.instance_name = instance_name
self.location = location
Expand All @@ -674,6 +678,8 @@ def execute(self, context: Dict):
pipeline_name=self.pipeline_name,
instance_url=api_url,
namespace=self.namespace,
runtime_args=self.runtime_args

)
self.log.info("Pipeline started")

Expand Down
8 changes: 7 additions & 1 deletion tests/providers/google/cloud/hooks/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
PIPELINE_NAME = "shrubberyPipeline"
PIPELINE = {"test": "pipeline"}
INSTANCE_URL = "http://datafusion.instance.com"
RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}

# pylint: disable=redefined-outer-name

Expand Down Expand Up @@ -204,11 +205,16 @@ def test_list_pipelines(self, mock_request, hook):
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline(self, mock_request, hook):
mock_request.return_value.status = 200
hook.start_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
hook.start_pipeline(
pipeline_name=PIPELINE_NAME,
instance_url=INSTANCE_URL,
runtime_args=RUNTIME_ARGS
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/start",
method="POST",
body=RUNTIME_ARGS
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
Expand Down
7 changes: 6 additions & 1 deletion tests/providers/google/cloud/operators/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
PIPELINE = {"test": "pipeline"}
INSTANCE_URL = "http://datafusion.instance.com"
NAMESPACE = "TEST_NAMESPACE"
RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}


class TestCloudDataFusionUpdateInstanceOperator:
Expand Down Expand Up @@ -195,14 +196,18 @@ def test_execute(self, mock_hook):
namespace=NAMESPACE,
location=LOCATION,
project_id=PROJECT_ID,
runtime_args=RUNTIME_ARGS
)
op.execute({})
mock_hook.return_value.get_instance.assert_called_once_with(
instance_name=INSTANCE_NAME, location=LOCATION, project_id=PROJECT_ID
)

mock_hook.return_value.start_pipeline.assert_called_once_with(
instance_url=INSTANCE_URL, pipeline_name=PIPELINE_NAME, namespace=NAMESPACE
instance_url=INSTANCE_URL,
pipeline_name=PIPELINE_NAME,
namespace=NAMESPACE,
runtime_args=RUNTIME_ARGS
)


Expand Down

0 comments on commit 3fc89f2

Please sign in to comment.