Skip to content

Commit 189af54

Browse files
authored
Add system tests for Stackdriver operators (#13644)
1 parent b007fc3 commit 189af54

File tree

8 files changed

+154
-32
lines changed

8 files changed

+154
-32
lines changed

airflow/providers/google/cloud/example_dags/example_stackdriver.py

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"""
2222

2323
import json
24+
import os
2425

2526
from airflow import models
2627
from airflow.providers.google.cloud.operators.stackdriver import (
@@ -37,38 +38,66 @@
3738
)
3839
from airflow.utils.dates import days_ago
3940

41+
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
42+
4043
TEST_ALERT_POLICY_1 = {
4144
"combiner": "OR",
42-
"name": "projects/sd-project/alertPolicies/12345",
4345
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
4446
"enabled": True,
4547
"displayName": "test alert 1",
4648
"conditions": [
4749
{
4850
"conditionThreshold": {
51+
"filter": (
52+
'metric.label.state="blocked" AND '
53+
'metric.type="agent.googleapis.com/processes/count_by_state" '
54+
'AND resource.type="gce_instance"'
55+
),
4956
"comparison": "COMPARISON_GT",
50-
"aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
57+
"thresholdValue": 100,
58+
"duration": "900s",
59+
"trigger": {"percent": 0},
60+
"aggregations": [
61+
{
62+
"alignmentPeriod": "60s",
63+
"perSeriesAligner": "ALIGN_MEAN",
64+
"crossSeriesReducer": "REDUCE_MEAN",
65+
"groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
66+
}
67+
],
5168
},
52-
"displayName": "Condition display",
53-
"name": "projects/sd-project/alertPolicies/123/conditions/456",
69+
"displayName": "test_alert_policy_1",
5470
}
5571
],
5672
}
5773

5874
TEST_ALERT_POLICY_2 = {
5975
"combiner": "OR",
60-
"name": "projects/sd-project/alertPolicies/6789",
6176
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
6277
"enabled": False,
6378
"displayName": "test alert 2",
6479
"conditions": [
6580
{
6681
"conditionThreshold": {
82+
"filter": (
83+
'metric.label.state="blocked" AND '
84+
'metric.type="agent.googleapis.com/processes/count_by_state" AND '
85+
'resource.type="gce_instance"'
86+
),
6787
"comparison": "COMPARISON_GT",
68-
"aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
88+
"thresholdValue": 100,
89+
"duration": "900s",
90+
"trigger": {"percent": 0},
91+
"aggregations": [
92+
{
93+
"alignmentPeriod": "60s",
94+
"perSeriesAligner": "ALIGN_MEAN",
95+
"crossSeriesReducer": "REDUCE_MEAN",
96+
"groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
97+
}
98+
],
6999
},
70-
"displayName": "Condition display",
71-
"name": "projects/sd-project/alertPolicies/456/conditions/789",
100+
"displayName": "test_alert_policy_2",
72101
}
73102
],
74103
}
@@ -77,15 +106,13 @@
77106
"displayName": "channel1",
78107
"enabled": True,
79108
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
80-
"name": "projects/sd-project/notificationChannels/12345",
81109
"type": "slack",
82110
}
83111

84112
TEST_NOTIFICATION_CHANNEL_2 = {
85113
"displayName": "channel2",
86114
"enabled": False,
87115
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
88-
"name": "projects/sd-project/notificationChannels/6789",
89116
"type": "slack",
90117
}
91118

@@ -150,18 +177,29 @@
150177
# [START howto_operator_gcp_stackdriver_delete_notification_channel]
151178
delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
152179
task_id='delete-notification-channel',
153-
name='test-channel',
180+
name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
154181
)
155182
# [END howto_operator_gcp_stackdriver_delete_notification_channel]
156183

184+
delete_notification_channel_2 = StackdriverDeleteNotificationChannelOperator(
185+
task_id='delete-notification-channel-2',
186+
name="{{ task_instance.xcom_pull('list-notification-channel')[1]['name'] }}",
187+
)
188+
157189
# [START howto_operator_gcp_stackdriver_delete_alert_policy]
158190
delete_alert_policy = StackdriverDeleteAlertOperator(
159191
task_id='delete-alert-policy',
160-
name='test-alert',
192+
name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
161193
)
162194
# [END howto_operator_gcp_stackdriver_delete_alert_policy]
163195

196+
delete_alert_policy_2 = StackdriverDeleteAlertOperator(
197+
task_id='delete-alert-policy-2',
198+
name="{{ task_instance.xcom_pull('list-alert-policies')[1]['name'] }}",
199+
)
200+
164201
create_notification_channel >> enable_notification_channel >> disable_notification_channel
165202
disable_notification_channel >> list_notification_channel >> create_alert_policy
166203
create_alert_policy >> enable_alert_policy >> disable_alert_policy >> list_alert_policies
167-
list_alert_policies >> delete_notification_channel >> delete_alert_policy
204+
list_alert_policies >> delete_notification_channel >> delete_notification_channel_2
205+
delete_notification_channel_2 >> delete_alert_policy >> delete_alert_policy_2

airflow/providers/google/cloud/hooks/stackdriver.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,11 +265,10 @@ def upsert_alert(
265265
]
266266
policies_ = []
267267
channels = []
268-
269-
for channel in record["channels"]:
268+
for channel in record.get("channels", []):
270269
channel_json = json.dumps(channel)
271270
channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()))
272-
for policy in record["policies"]:
271+
for policy in record.get("policies", []):
273272
policy_json = json.dumps(policy)
274273
policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy()))
275274

airflow/providers/google/cloud/operators/stackdriver.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from typing import Optional, Sequence, Union
2020

2121
from google.api_core.gapic_v1.method import DEFAULT
22+
from google.protobuf.json_format import MessageToDict
2223

2324
from airflow.models import BaseOperator
2425
from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook
@@ -125,7 +126,7 @@ def __init__(
125126

126127
def execute(self, context):
127128
self.log.info(
128-
'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
129+
'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
129130
self.project_id,
130131
self.format_,
131132
self.filter_,
@@ -139,7 +140,7 @@ def execute(self, context):
139140
impersonation_chain=self.impersonation_chain,
140141
)
141142

142-
return self.hook.list_alert_policies(
143+
result = self.hook.list_alert_policies(
143144
project_id=self.project_id,
144145
format_=self.format_,
145146
filter_=self.filter_,
@@ -149,6 +150,7 @@ def execute(self, context):
149150
timeout=self.timeout,
150151
metadata=self.metadata,
151152
)
153+
return [MessageToDict(policy) for policy in result]
152154

153155

154156
class StackdriverEnableAlertPoliciesOperator(BaseOperator):
@@ -614,7 +616,7 @@ def __init__(
614616

615617
def execute(self, context):
616618
self.log.info(
617-
'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
619+
'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
618620
self.project_id,
619621
self.format_,
620622
self.filter_,
@@ -627,7 +629,7 @@ def execute(self, context):
627629
delegate_to=self.delegate_to,
628630
impersonation_chain=self.impersonation_chain,
629631
)
630-
return self.hook.list_notification_channels(
632+
channels = self.hook.list_notification_channels(
631633
format_=self.format_,
632634
project_id=self.project_id,
633635
filter_=self.filter_,
@@ -637,6 +639,8 @@ def execute(self, context):
637639
timeout=self.timeout,
638640
metadata=self.metadata,
639641
)
642+
result = [MessageToDict(channel) for channel in channels]
643+
return result
640644

641645

642646
class StackdriverEnableNotificationChannelsOperator(BaseOperator):

tests/providers/google/cloud/hooks/test_stackdriver.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,8 @@ def test_stackdriver_enable_alert_policy(self, mock_policy_client, mock_get_cred
132132
page_size=None,
133133
metadata=None,
134134
)
135-
mask = monitoring_v3.types.field_mask_pb2.FieldMask()
135+
mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
136136
alert_policy_disabled.enabled.value = True # pylint: disable=no-member
137-
mask.paths.append('enabled') # pylint: disable=no-member
138137
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
139138
alert_policy=alert_policy_disabled,
140139
update_mask=mask,
@@ -170,9 +169,8 @@ def test_stackdriver_disable_alert_policy(self, mock_policy_client, mock_get_cre
170169
page_size=None,
171170
metadata=None,
172171
)
173-
mask = monitoring_v3.types.field_mask_pb2.FieldMask()
172+
mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
174173
alert_policy_enabled.enabled.value = False # pylint: disable=no-member
175-
mask.paths.append('enabled') # pylint: disable=no-member
176174
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
177175
alert_policy=alert_policy_enabled,
178176
update_mask=mask,
@@ -236,6 +234,50 @@ def test_stackdriver_upsert_alert_policy(
236234
alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
237235
)
238236

237+
@mock.patch(
238+
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
239+
return_value=(CREDENTIALS, PROJECT_ID),
240+
)
241+
@mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
242+
@mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
243+
def test_stackdriver_upsert_alert_policy_without_channel(
244+
self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
245+
):
246+
hook = stackdriver.StackdriverHook()
247+
existing_alert_policy = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
248+
249+
mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
250+
mock_channel_client.return_value.list_notification_channels.return_value = []
251+
252+
hook.upsert_alert(
253+
alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
254+
project_id=PROJECT_ID,
255+
)
256+
mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
257+
name=f'projects/{PROJECT_ID}',
258+
filter_=None,
259+
retry=DEFAULT,
260+
timeout=DEFAULT,
261+
order_by=None,
262+
page_size=None,
263+
metadata=None,
264+
)
265+
mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
266+
name=f'projects/{PROJECT_ID}',
267+
filter_=None,
268+
retry=DEFAULT,
269+
timeout=DEFAULT,
270+
order_by=None,
271+
page_size=None,
272+
metadata=None,
273+
)
274+
275+
existing_alert_policy.ClearField('creation_record')
276+
existing_alert_policy.ClearField('mutation_record')
277+
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
278+
alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
279+
)
280+
239281
@mock.patch(
240282
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
241283
return_value=(CREDENTIALS, PROJECT_ID),

tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@
2828
from airflow.models import TaskInstance
2929
from airflow.utils.log.log_reader import TaskLogReader
3030
from airflow.utils.session import provide_session
31-
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDDRIVER
31+
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDRIVER
3232
from tests.test_utils.config import conf_vars
3333
from tests.test_utils.db import clear_db_runs
3434
from tests.test_utils.gcp_system_helpers import provide_gcp_context, resolve_full_gcp_key_path
3535

3636

3737
@pytest.mark.system("google")
38-
@pytest.mark.credential_file(GCP_STACKDDRIVER)
38+
@pytest.mark.credential_file(GCP_STACKDRIVER)
3939
class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
4040
def setUp(self) -> None:
4141
clear_db_runs()
@@ -54,7 +54,7 @@ def test_should_support_key_auth(self, session):
5454
'os.environ',
5555
AIRFLOW__LOGGING__REMOTE_LOGGING="true",
5656
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
57-
AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
57+
AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDRIVER),
5858
AIRFLOW__CORE__LOAD_EXAMPLES="false",
5959
AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
6060
):
@@ -72,7 +72,7 @@ def test_should_support_adc(self, session):
7272
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
7373
AIRFLOW__CORE__LOAD_EXAMPLES="false",
7474
AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
75-
GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
75+
GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDRIVER),
7676
):
7777
self.assertEqual(0, subprocess.Popen(["airflow", "dags", "trigger", "example_complex"]).wait())
7878
self.assertEqual(0, subprocess.Popen(["airflow", "scheduler", "--num-runs", "1"]).wait())
@@ -81,7 +81,7 @@ def test_should_support_adc(self, session):
8181
self.assert_remote_logs("INFO - Task exited with return code 0", ti)
8282

8383
def assert_remote_logs(self, expected_message, ti):
84-
with provide_gcp_context(GCP_STACKDDRIVER), conf_vars(
84+
with provide_gcp_context(GCP_STACKDRIVER), conf_vars(
8585
{
8686
('logging', 'remote_logging'): 'True',
8787
('logging', 'remote_base_log_folder'): f"stackdriver://{self.log_name}",

tests/providers/google/cloud/operators/test_stackdriver.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
from unittest import mock
2222

2323
from google.api_core.gapic_v1.method import DEFAULT
24+
from google.cloud.monitoring_v3.proto.alert_pb2 import AlertPolicy
25+
from google.cloud.monitoring_v3.proto.notification_pb2 import NotificationChannel
2426

2527
from airflow.providers.google.cloud.operators.stackdriver import (
2628
StackdriverDeleteAlertOperator,
@@ -94,7 +96,8 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
9496
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
9597
def test_execute(self, mock_hook):
9698
operator = StackdriverListAlertPoliciesOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
97-
operator.execute(None)
99+
mock_hook.return_value.list_alert_policies.return_value = [AlertPolicy(name="test-name")]
100+
result = operator.execute(None)
98101
mock_hook.return_value.list_alert_policies.assert_called_once_with(
99102
project_id=None,
100103
filter_=TEST_FILTER,
@@ -105,6 +108,7 @@ def test_execute(self, mock_hook):
105108
timeout=DEFAULT,
106109
metadata=None,
107110
)
111+
self.assertEqual([{'name': 'test-name'}], result)
108112

109113

110114
class TestStackdriverEnableAlertPoliciesOperator(unittest.TestCase):
@@ -160,7 +164,11 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
160164
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
161165
def test_execute(self, mock_hook):
162166
operator = StackdriverListNotificationChannelsOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
163-
operator.execute(None)
167+
mock_hook.return_value.list_notification_channels.return_value = [
168+
NotificationChannel(name="test-123")
169+
]
170+
171+
result = operator.execute(None)
164172
mock_hook.return_value.list_notification_channels.assert_called_once_with(
165173
project_id=None,
166174
filter_=TEST_FILTER,
@@ -171,6 +179,7 @@ def test_execute(self, mock_hook):
171179
timeout=DEFAULT,
172180
metadata=None,
173181
)
182+
self.assertEqual([{'name': 'test-123'}], result)
174183

175184

176185
class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):

0 commit comments

Comments
 (0)