Skip to content

Commit

Permalink
Fix JSONDecodeError in Datafusion operators (#26202)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Sep 18, 2022
1 parent 23e6cf2 commit 97b144f
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 15 deletions.
43 changes: 28 additions & 15 deletions airflow/providers/google/cloud/hooks/datafusion.py
Expand Up @@ -152,6 +152,16 @@ def _cdap_request(
response = request(method=method, url=url, headers=headers, body=payload)
return response

@staticmethod
def _check_response_status_and_data(response, message: str) -> None:
if response.status != 200:
raise AirflowException(message)
if response.data is None:
raise AirflowException(
"Empty response received. Please, check for possible root "
"causes of this behavior either in DAG code or on Cloud Datafusion side"
)

def get_conn(self) -> Resource:
"""Retrieves connection to DataFusion."""
if not self._conn:
Expand Down Expand Up @@ -311,10 +321,9 @@ def create_pipeline(
"""
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
response = self._cdap_request(url=url, method="PUT", body=pipeline)
if response.status != 200:
raise AirflowException(
f"Creating a pipeline failed with code {response.status} while calling {url}"
)
self._check_response_status_and_data(
response, f"Creating a pipeline failed with code {response.status} while calling {url}"
)

def delete_pipeline(
self,
Expand All @@ -338,8 +347,9 @@ def delete_pipeline(
url = os.path.join(url, "versions", version_id)

response = self._cdap_request(url=url, method="DELETE", body=None)
if response.status != 200:
raise AirflowException(f"Deleting a pipeline failed with code {response.status}")
self._check_response_status_and_data(
response, f"Deleting a pipeline failed with code {response.status}"
)

def list_pipelines(
self,
Expand Down Expand Up @@ -368,8 +378,9 @@ def list_pipelines(
url = os.path.join(url, urlencode(query))

response = self._cdap_request(url=url, method="GET", body=None)
if response.status != 200:
raise AirflowException(f"Listing pipelines failed with code {response.status}")
self._check_response_status_and_data(
response, f"Listing pipelines failed with code {response.status}"
)
return json.loads(response.data)

def get_pipeline_workflow(
Expand All @@ -388,8 +399,9 @@ def get_pipeline_workflow(
quote(pipeline_id),
)
response = self._cdap_request(url=url, method="GET")
if response.status != 200:
raise AirflowException(f"Retrieving a pipeline state failed with code {response.status}")
self._check_response_status_and_data(
response, f"Retrieving a pipeline state failed with code {response.status}"
)
workflow = json.loads(response.data)
return workflow

Expand Down Expand Up @@ -430,9 +442,9 @@ def start_pipeline(
}
]
response = self._cdap_request(url=url, method="POST", body=body)
if response.status != 200:
raise AirflowException(f"Starting a pipeline failed with code {response.status}")

self._check_response_status_and_data(
response, f"Starting a pipeline failed with code {response.status}"
)
response_json = json.loads(response.data)
return response_json[0]["runId"]

Expand All @@ -454,5 +466,6 @@ def stop_pipeline(self, pipeline_name: str, instance_url: str, namespace: str =
"stop",
)
response = self._cdap_request(url=url, method="POST")
if response.status != 200:
raise AirflowException(f"Stopping a pipeline failed with code {response.status}")
self._check_response_status_and_data(
response, f"Stopping a pipeline failed with code {response.status}"
)
193 changes: 193 additions & 0 deletions tests/providers/google/cloud/hooks/test_datafusion.py
Expand Up @@ -21,6 +21,7 @@

import pytest

from airflow import AirflowException
from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id

Expand All @@ -32,6 +33,7 @@
INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
PROJECT_ID = "test_project_id"
PIPELINE_NAME = "shrubberyPipeline"
PIPELINE_ID = "123"
PIPELINE = {"test": "pipeline"}
INSTANCE_URL = "http://datafusion.instance.com"
RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
Expand Down Expand Up @@ -159,6 +161,33 @@ def test_create_pipeline(self, mock_request, hook):
body=PIPELINE,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_create_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.create_pipeline(pipeline_name=PIPELINE_NAME, pipeline=PIPELINE, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="PUT",
body=PIPELINE,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_create_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Creating a pipeline failed with code 404"):
hook.create_pipeline(pipeline_name=PIPELINE_NAME, pipeline=PIPELINE, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="PUT",
body=PIPELINE,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_delete_pipeline(self, mock_request, hook):
mock_request.return_value.status = 200
Expand All @@ -169,6 +198,33 @@ def test_delete_pipeline(self, mock_request, hook):
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_delete_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.delete_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="DELETE",
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_delete_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Deleting a pipeline failed with code 404"):
hook.delete_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="DELETE",
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines(self, mock_request, hook):
data = {"data": "test"}
Expand All @@ -180,6 +236,29 @@ def test_list_pipelines(self, mock_request, hook):
)
assert result == data

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.list_pipelines(instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps", method="GET", body=None
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Listing pipelines failed with code 404"):
hook.list_pipelines(instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps", method="GET", body=None
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline(self, mock_request, hook):
run_id = 1234
Expand All @@ -198,6 +277,49 @@ def test_start_pipeline(self, mock_request, hook):
url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", body=body
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.start_pipeline(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, runtime_args=RUNTIME_ARGS
)
body = [
{
"appId": PIPELINE_NAME,
"programType": "workflow",
"programId": "DataPipelineWorkflow",
"runtimeargs": RUNTIME_ARGS,
}
]
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", body=body
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Starting a pipeline failed with code 404"):
hook.start_pipeline(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, runtime_args=RUNTIME_ARGS
)
body = [
{
"appId": PIPELINE_NAME,
"programType": "workflow",
"programId": "DataPipelineWorkflow",
"runtimeargs": RUNTIME_ARGS,
}
]
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", body=body
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline(self, mock_request, hook):
mock_request.return_value.status = 200
Expand All @@ -207,3 +329,74 @@ def test_stop_pipeline(self, mock_request, hook):
f"workflows/DataPipelineWorkflow/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.stop_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Stopping a pipeline failed with code 404"):
hook.stop_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_get_pipeline_workflow(self, mock_request, hook):
run_id = 1234
mock_request.return_value = mock.MagicMock(status=200, data=f'[{{"runId":{run_id}}}]')
hook.get_pipeline_workflow(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, pipeline_id=PIPELINE_ID
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}",
method="GET",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_get_pipeline_workflow_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.get_pipeline_workflow(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, pipeline_id=PIPELINE_ID
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}",
method="GET",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_get_pipeline_workflow_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Retrieving a pipeline state failed with code 404"):
hook.get_pipeline_workflow(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, pipeline_id=PIPELINE_ID
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}",
method="GET",
)

0 comments on commit 97b144f

Please sign in to comment.