Skip to content

Commit

Permalink
Add system tests for Stackdriver operators (#13644)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Jan 13, 2021
1 parent b007fc3 commit 189af54
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 32 deletions.
64 changes: 51 additions & 13 deletions airflow/providers/google/cloud/example_dags/example_stackdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"""

import json
import os

from airflow import models
from airflow.providers.google.cloud.operators.stackdriver import (
Expand All @@ -37,38 +38,66 @@
)
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")

TEST_ALERT_POLICY_1 = {
"combiner": "OR",
"name": "projects/sd-project/alertPolicies/12345",
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": True,
"displayName": "test alert 1",
"conditions": [
{
"conditionThreshold": {
"filter": (
'metric.label.state="blocked" AND '
'metric.type="agent.googleapis.com/processes/count_by_state" '
'AND resource.type="gce_instance"'
),
"comparison": "COMPARISON_GT",
"aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
"thresholdValue": 100,
"duration": "900s",
"trigger": {"percent": 0},
"aggregations": [
{
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
}
],
},
"displayName": "Condition display",
"name": "projects/sd-project/alertPolicies/123/conditions/456",
"displayName": "test_alert_policy_1",
}
],
}

TEST_ALERT_POLICY_2 = {
"combiner": "OR",
"name": "projects/sd-project/alertPolicies/6789",
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": False,
"displayName": "test alert 2",
"conditions": [
{
"conditionThreshold": {
"filter": (
'metric.label.state="blocked" AND '
'metric.type="agent.googleapis.com/processes/count_by_state" AND '
'resource.type="gce_instance"'
),
"comparison": "COMPARISON_GT",
"aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
"thresholdValue": 100,
"duration": "900s",
"trigger": {"percent": 0},
"aggregations": [
{
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
}
],
},
"displayName": "Condition display",
"name": "projects/sd-project/alertPolicies/456/conditions/789",
"displayName": "test_alert_policy_2",
}
],
}
Expand All @@ -77,15 +106,13 @@
"displayName": "channel1",
"enabled": True,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
"name": "projects/sd-project/notificationChannels/12345",
"type": "slack",
}

TEST_NOTIFICATION_CHANNEL_2 = {
"displayName": "channel2",
"enabled": False,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
"name": "projects/sd-project/notificationChannels/6789",
"type": "slack",
}

Expand Down Expand Up @@ -150,18 +177,29 @@
# [START howto_operator_gcp_stackdriver_delete_notification_channel]
delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
task_id='delete-notification-channel',
name='test-channel',
name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)
# [END howto_operator_gcp_stackdriver_delete_notification_channel]

delete_notification_channel_2 = StackdriverDeleteNotificationChannelOperator(
task_id='delete-notification-channel-2',
name="{{ task_instance.xcom_pull('list-notification-channel')[1]['name'] }}",
)

# [START howto_operator_gcp_stackdriver_delete_alert_policy]
delete_alert_policy = StackdriverDeleteAlertOperator(
task_id='delete-alert-policy',
name='test-alert',
name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)
# [END howto_operator_gcp_stackdriver_delete_alert_policy]

delete_alert_policy_2 = StackdriverDeleteAlertOperator(
task_id='delete-alert-policy-2',
name="{{ task_instance.xcom_pull('list-alert-policies')[1]['name'] }}",
)

create_notification_channel >> enable_notification_channel >> disable_notification_channel
disable_notification_channel >> list_notification_channel >> create_alert_policy
create_alert_policy >> enable_alert_policy >> disable_alert_policy >> list_alert_policies
list_alert_policies >> delete_notification_channel >> delete_alert_policy
list_alert_policies >> delete_notification_channel >> delete_notification_channel_2
delete_notification_channel_2 >> delete_alert_policy >> delete_alert_policy_2
5 changes: 2 additions & 3 deletions airflow/providers/google/cloud/hooks/stackdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,10 @@ def upsert_alert(
]
policies_ = []
channels = []

for channel in record["channels"]:
for channel in record.get("channels", []):
channel_json = json.dumps(channel)
channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()))
for policy in record["policies"]:
for policy in record.get("policies", []):
policy_json = json.dumps(policy)
policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy()))

Expand Down
12 changes: 8 additions & 4 deletions airflow/providers/google/cloud/operators/stackdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Optional, Sequence, Union

from google.api_core.gapic_v1.method import DEFAULT
from google.protobuf.json_format import MessageToDict

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook
Expand Down Expand Up @@ -125,7 +126,7 @@ def __init__(

def execute(self, context):
self.log.info(
'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
self.project_id,
self.format_,
self.filter_,
Expand All @@ -139,7 +140,7 @@ def execute(self, context):
impersonation_chain=self.impersonation_chain,
)

return self.hook.list_alert_policies(
result = self.hook.list_alert_policies(
project_id=self.project_id,
format_=self.format_,
filter_=self.filter_,
Expand All @@ -149,6 +150,7 @@ def execute(self, context):
timeout=self.timeout,
metadata=self.metadata,
)
return [MessageToDict(policy) for policy in result]


class StackdriverEnableAlertPoliciesOperator(BaseOperator):
Expand Down Expand Up @@ -614,7 +616,7 @@ def __init__(

def execute(self, context):
self.log.info(
'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
self.project_id,
self.format_,
self.filter_,
Expand All @@ -627,7 +629,7 @@ def execute(self, context):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
return self.hook.list_notification_channels(
channels = self.hook.list_notification_channels(
format_=self.format_,
project_id=self.project_id,
filter_=self.filter_,
Expand All @@ -637,6 +639,8 @@ def execute(self, context):
timeout=self.timeout,
metadata=self.metadata,
)
result = [MessageToDict(channel) for channel in channels]
return result


class StackdriverEnableNotificationChannelsOperator(BaseOperator):
Expand Down
50 changes: 46 additions & 4 deletions tests/providers/google/cloud/hooks/test_stackdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,8 @@ def test_stackdriver_enable_alert_policy(self, mock_policy_client, mock_get_cred
page_size=None,
metadata=None,
)
mask = monitoring_v3.types.field_mask_pb2.FieldMask()
mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
alert_policy_disabled.enabled.value = True # pylint: disable=no-member
mask.paths.append('enabled') # pylint: disable=no-member
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
alert_policy=alert_policy_disabled,
update_mask=mask,
Expand Down Expand Up @@ -170,9 +169,8 @@ def test_stackdriver_disable_alert_policy(self, mock_policy_client, mock_get_cre
page_size=None,
metadata=None,
)
mask = monitoring_v3.types.field_mask_pb2.FieldMask()
mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
alert_policy_enabled.enabled.value = False # pylint: disable=no-member
mask.paths.append('enabled') # pylint: disable=no-member
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
alert_policy=alert_policy_enabled,
update_mask=mask,
Expand Down Expand Up @@ -236,6 +234,50 @@ def test_stackdriver_upsert_alert_policy(
alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
)

@mock.patch(
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID),
)
@mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
@mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
def test_stackdriver_upsert_alert_policy_without_channel(
self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
):
hook = stackdriver.StackdriverHook()
existing_alert_policy = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())

mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
mock_channel_client.return_value.list_notification_channels.return_value = []

hook.upsert_alert(
alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
project_id=PROJECT_ID,
)
mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
name=f'projects/{PROJECT_ID}',
filter_=None,
retry=DEFAULT,
timeout=DEFAULT,
order_by=None,
page_size=None,
metadata=None,
)
mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
name=f'projects/{PROJECT_ID}',
filter_=None,
retry=DEFAULT,
timeout=DEFAULT,
order_by=None,
page_size=None,
metadata=None,
)

existing_alert_policy.ClearField('creation_record')
existing_alert_policy.ClearField('mutation_record')
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
)

@mock.patch(
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
return_value=(CREDENTIALS, PROJECT_ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
from airflow.models import TaskInstance
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.session import provide_session
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDDRIVER
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDRIVER
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
from tests.test_utils.gcp_system_helpers import provide_gcp_context, resolve_full_gcp_key_path


@pytest.mark.system("google")
@pytest.mark.credential_file(GCP_STACKDDRIVER)
@pytest.mark.credential_file(GCP_STACKDRIVER)
class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
def setUp(self) -> None:
clear_db_runs()
Expand All @@ -54,7 +54,7 @@ def test_should_support_key_auth(self, session):
'os.environ',
AIRFLOW__LOGGING__REMOTE_LOGGING="true",
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDRIVER),
AIRFLOW__CORE__LOAD_EXAMPLES="false",
AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
):
Expand All @@ -72,7 +72,7 @@ def test_should_support_adc(self, session):
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
AIRFLOW__CORE__LOAD_EXAMPLES="false",
AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDRIVER),
):
self.assertEqual(0, subprocess.Popen(["airflow", "dags", "trigger", "example_complex"]).wait())
self.assertEqual(0, subprocess.Popen(["airflow", "scheduler", "--num-runs", "1"]).wait())
Expand All @@ -81,7 +81,7 @@ def test_should_support_adc(self, session):
self.assert_remote_logs("INFO - Task exited with return code 0", ti)

def assert_remote_logs(self, expected_message, ti):
with provide_gcp_context(GCP_STACKDDRIVER), conf_vars(
with provide_gcp_context(GCP_STACKDRIVER), conf_vars(
{
('logging', 'remote_logging'): 'True',
('logging', 'remote_base_log_folder'): f"stackdriver://{self.log_name}",
Expand Down
13 changes: 11 additions & 2 deletions tests/providers/google/cloud/operators/test_stackdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from unittest import mock

from google.api_core.gapic_v1.method import DEFAULT
from google.cloud.monitoring_v3.proto.alert_pb2 import AlertPolicy
from google.cloud.monitoring_v3.proto.notification_pb2 import NotificationChannel

from airflow.providers.google.cloud.operators.stackdriver import (
StackdriverDeleteAlertOperator,
Expand Down Expand Up @@ -94,7 +96,8 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
def test_execute(self, mock_hook):
operator = StackdriverListAlertPoliciesOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
operator.execute(None)
mock_hook.return_value.list_alert_policies.return_value = [AlertPolicy(name="test-name")]
result = operator.execute(None)
mock_hook.return_value.list_alert_policies.assert_called_once_with(
project_id=None,
filter_=TEST_FILTER,
Expand All @@ -105,6 +108,7 @@ def test_execute(self, mock_hook):
timeout=DEFAULT,
metadata=None,
)
self.assertEqual([{'name': 'test-name'}], result)


class TestStackdriverEnableAlertPoliciesOperator(unittest.TestCase):
Expand Down Expand Up @@ -160,7 +164,11 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
def test_execute(self, mock_hook):
operator = StackdriverListNotificationChannelsOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
operator.execute(None)
mock_hook.return_value.list_notification_channels.return_value = [
NotificationChannel(name="test-123")
]

result = operator.execute(None)
mock_hook.return_value.list_notification_channels.assert_called_once_with(
project_id=None,
filter_=TEST_FILTER,
Expand All @@ -171,6 +179,7 @@ def test_execute(self, mock_hook):
timeout=DEFAULT,
metadata=None,
)
self.assertEqual([{'name': 'test-123'}], result)


class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):
Expand Down

0 comments on commit 189af54

Please sign in to comment.