Skip to content

Commit

Permalink
Fix part of Google system tests (#18494)
Browse files Browse the repository at this point in the history
* Fix CloudBuildExampleDagsSystemTest

* Fix DataCatalog tests

* Fix Dataprep test

* Fix GCS test

* Fix Dataflow test

* Fix GC Functions test

* Fix Cloud SQL tests

* Fix Data Fusion test

* Change env var in Dataprep system test

* Fix config for dataflow system tests

* Add unit test for datafusion

* Fix Data Fusion utest, add optional parameter to data fusion pipeline state sensor

* Update code with black suggestions

* Fix flake8 issue

* Revert changes to example_cloud_sql.py according to review feedback

Co-authored-by: Lukasz Wyszomirski <[email protected]>
  • Loading branch information
mnojek and Lukasz Wyszomirski committed Sep 29, 2021
1 parent 461ec4c commit 9279c44
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@

# [START howto_operator_create_build_from_repo_body]
create_build_from_repo_body = {
"source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "master"}},
"source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "main"}},
"steps": [
{
"name": "gcr.io/cloud-builders/docker",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_REGION = os.environ.get('GCP_REGION', 'europe-west-1b')
GCP_REGION = os.environ.get('GCP_REGION', 'europe-west1')

GCSQL_POSTGRES_INSTANCE_NAME_QUERY = os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME_QUERY', 'testpostgres')
GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME', 'postgresdb')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"""
Example Airflow DAG that interacts with Google Data Catalog service
"""
import os

from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField

from airflow import models
Expand Down Expand Up @@ -49,7 +51,8 @@
)
from airflow.utils.dates import days_ago

PROJECT_ID = "polidea-airflow"
PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
LOCATION = "us-central1"
ENTRY_GROUP_ID = "important_data_jan_2019"
ENTRY_ID = "python_files"
Expand Down Expand Up @@ -92,7 +95,7 @@
entry={
"display_name": "Wizard",
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": ["gs://INVALID BUCKET NAME/**"]},
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]},
},
)
# [END howto_operator_gcp_datacatalog_create_entry_gcs]
Expand Down
18 changes: 13 additions & 5 deletions airflow/providers/google/cloud/example_dags/example_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
# [START howto_operator_start_python_job_async]
start_python_job_async = BeamRunPythonPipelineOperator(
task_id="start-python-job-async",
runner="DataflowRunner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={
Expand All @@ -160,14 +161,18 @@
py_requirements=['apache-beam[gcp]==2.25.0'],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config={"location": 'europe-west3', "wait_until_finished": False},
dataflow_config={
"job_name": "start-python-job-async",
"location": 'europe-west3',
"wait_until_finished": False,
},
)
# [END howto_operator_start_python_job_async]

# [START howto_sensor_wait_for_job_status]
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location='europe-west3',
)
Expand All @@ -191,9 +196,10 @@ def callback(metrics: List[Dict]) -> bool:

wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait-for-python-job-async-metric",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
)
# [END howto_sensor_wait_for_job_metric]

Expand All @@ -207,9 +213,10 @@ def check_message(messages: List[dict]) -> bool:

wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait-for-python-job-async-message",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_message,
fail_on_terminal_state=False,
)
# [END howto_sensor_wait_for_job_message]

Expand All @@ -223,9 +230,10 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:

wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
task_id="wait-for-python-job-async-autoscaling-event",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_autoscaling_event,
fail_on_terminal_state=False,
)
# [END howto_sensor_wait_for_job_autoscaling_event]

Expand Down
20 changes: 13 additions & 7 deletions airflow/providers/google/cloud/example_dags/example_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,26 @@
from airflow.utils.state import State

# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
LOCATION = "europe-north1"
INSTANCE_NAME = "airflow-test-instance"
INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}

BUCKET_1 = os.environ.get("GCP_DATAFUSION_BUCKET_1", "test-datafusion-bucket-1")
BUCKET_2 = os.environ.get("GCP_DATAFUSION_BUCKET_2", "test-datafusion-bucket-2")

BUCKET_1_URI = f"gs//{BUCKET_1}"
BUCKET_2_URI = f"gs//{BUCKET_2}"
BUCKET_1_URI = f"gs://{BUCKET_1}"
BUCKET_2_URI = f"gs://{BUCKET_2}"

PIPELINE_NAME = os.environ.get("GCP_DATAFUSION_PIPELINE_NAME", "airflow_test")
PIPELINE = {
"name": "test-pipe",
"description": "Data Pipeline Application",
"artifact": {"name": "cdap-data-pipeline", "version": "6.1.2", "scope": "SYSTEM"},
"artifact": {"name": "cdap-data-pipeline", "version": "6.4.1", "scope": "SYSTEM"},
"config": {
"resources": {"memoryMB": 2048, "virtualCores": 1},
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
Expand All @@ -72,7 +77,7 @@
"label": "GCS",
"artifact": {
"name": "google-cloud",
"version": "0.14.2",
"version": "0.17.3",
"scope": "SYSTEM",
},
"properties": {
Expand Down Expand Up @@ -105,7 +110,7 @@
"label": "GCS2",
"artifact": {
"name": "google-cloud",
"version": "0.14.2",
"version": "0.17.3",
"scope": "SYSTEM",
},
"properties": {
Expand Down Expand Up @@ -176,7 +181,7 @@
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="instance.displayName",
update_mask="",
task_id="update_instance",
)
# [END howto_cloud_data_fusion_update_instance_operator]
Expand Down Expand Up @@ -223,6 +228,7 @@
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
)
GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
GCF_RUNTIME = 'nodejs6'
GCF_RUNTIME = 'nodejs14'
GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', "True") == "True"

# [START howto_operator_gcf_deploy_body]
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/hooks/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ def create_pipeline(
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
response = self._cdap_request(url=url, method="PUT", body=pipeline)
if response.status != 200:
raise AirflowException(f"Creating a pipeline failed with code {response.status}")
raise AirflowException(
f"Creating a pipeline failed with code {response.status} while calling {url}"
)

def delete_pipeline(
self,
Expand Down
10 changes: 10 additions & 0 deletions airflow/providers/google/cloud/sensors/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class CloudDataFusionPipelineStateSensor(BaseSensorOperator):
:type pipeline_name: str
:param expected_statuses: State that is expected
:type expected_statuses: set[str]
:param failure_statuses: State that will terminate the sensor with an exception
:type failure_statuses: set[str]
:param instance_name: The name of the instance.
:type instance_name: str
:param location: The Cloud Data Fusion location in which to handle the request.
Expand Down Expand Up @@ -70,6 +72,7 @@ def __init__(
expected_statuses: Set[str],
instance_name: str,
location: str,
failure_statuses: Set[str] = None,
project_id: Optional[str] = None,
namespace: str = "default",
gcp_conn_id: str = 'google_cloud_default',
Expand All @@ -81,6 +84,7 @@ def __init__(
self.pipeline_name = pipeline_name
self.pipeline_id = pipeline_id
self.expected_statuses = expected_statuses
self.failure_statuses = failure_statuses
self.instance_name = instance_name
self.location = location
self.project_id = project_id
Expand Down Expand Up @@ -119,6 +123,12 @@ def poke(self, context: dict) -> bool:
except AirflowException:
pass # Because the pipeline may not be visible in system yet

if self.failure_statuses and pipeline_status in self.failure_statuses:
raise AirflowException(
f"Pipeline with id '{self.pipeline_id}' state is: {pipeline_status}. "
f"Terminating sensor..."
)

self.log.debug(
"Current status of the pipeline workflow for %s: %s.", self.pipeline_id, pipeline_status
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def create_repository_and_bucket(self):
GCP_PROJECT_ID, GCP_REPOSITORY_NAME
)
self.execute_cmd(["git", "remote", "add", "origin", repo_url], cwd=tmp_dir)
self.execute_cmd(["git", "push", "--force", "origin", "main"], cwd=tmp_dir)
self.execute_cmd(["git", "push", "--force", "origin", "master"], cwd=tmp_dir)

def delete_repo(self):
"""Delete repository in Google Cloud Source Repository service"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from tests.test_utils.db import clear_db_connections
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest

TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
TOKEN = environ.get("DATAPREP_TOKEN")
EXTRA = {"extra__dataprep__token": TOKEN}


@pytest.mark.skipif(environ.get("DATAPREP_TOKEN") is None, reason='Dataprep token not present')
@pytest.mark.skipif(TOKEN is None, reason='Dataprep token not present')
class DataprepExampleDagsTest(GoogleSystemTest):
"""
System tests for Dataprep operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ def create_test_file():

@staticmethod
def remove_test_files():
os.remove(PATH_TO_UPLOAD_FILE)
os.remove(PATH_TO_SAVED_FILE)
os.remove(PATH_TO_TRANSFORM_SCRIPT)
if os.path.exists(PATH_TO_UPLOAD_FILE):
os.remove(PATH_TO_UPLOAD_FILE)
if os.path.exists(PATH_TO_SAVED_FILE):
os.remove(PATH_TO_SAVED_FILE)
if os.path.exists(PATH_TO_TRANSFORM_SCRIPT):
os.remove(PATH_TO_TRANSFORM_SCRIPT)

def remove_bucket(self):
self.execute_cmd(["gsutil", "rm", "-r", f"gs://{BUCKET_1}"])
Expand Down
30 changes: 29 additions & 1 deletion tests/providers/google/cloud/sensors/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import unittest
from unittest import mock

import pytest
from parameterized.parameterized import parameterized

from airflow import AirflowException
from airflow.providers.google.cloud.hooks.datafusion import PipelineStates
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor

Expand All @@ -33,6 +35,7 @@
GCP_CONN_ID = "test_conn_id"
DELEGATE_TO = "test_delegate_to"
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
FAILURE_STATUSES = {"FAILED"}


class TestCloudDataFusionPipelineStateSensor(unittest.TestCase):
Expand All @@ -51,7 +54,7 @@ def test_poke(self, expected_status, current_status, sensor_return, mock_hook):
pipeline_name=PIPELINE_NAME,
pipeline_id=PIPELINE_ID,
project_id=PROJECT_ID,
expected_statuses=[expected_status],
expected_statuses={expected_status},
instance_name=INSTANCE_NAME,
location=LOCATION,
gcp_conn_id=GCP_CONN_ID,
Expand All @@ -73,3 +76,28 @@ def test_poke(self, expected_status, current_status, sensor_return, mock_hook):
mock_hook.return_value.get_instance.assert_called_once_with(
instance_name=INSTANCE_NAME, location=LOCATION, project_id=PROJECT_ID
)

@mock.patch("airflow.providers.google.cloud.sensors.datafusion.DataFusionHook")
def test_assertion(self, mock_hook):
mock_hook.return_value.get_instance.return_value = {"apiEndpoint": INSTANCE_URL}

task = CloudDataFusionPipelineStateSensor(
task_id="test_task_id",
pipeline_name=PIPELINE_NAME,
pipeline_id=PIPELINE_ID,
project_id=PROJECT_ID,
expected_statuses={PipelineStates.COMPLETED},
failure_statuses=FAILURE_STATUSES,
instance_name=INSTANCE_NAME,
location=LOCATION,
gcp_conn_id=GCP_CONN_ID,
delegate_to=DELEGATE_TO,
impersonation_chain=IMPERSONATION_CHAIN,
)

with pytest.raises(
AirflowException,
match=f"Pipeline with id '{PIPELINE_ID}' state is: FAILED. Terminating sensor...",
):
mock_hook.return_value.get_pipeline_workflow.return_value = {"status": 'FAILED'}
task.poke(mock.MagicMock())

0 comments on commit 9279c44

Please sign in to comment.