Skip to content

Commit

Permalink
Support google-cloud-monitoring>=2.0.0 (#13769)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Feb 2, 2021
1 parent 594069e commit d2efb33
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 230 deletions.
1 change: 1 addition & 0 deletions airflow/providers/google/ADDITIONAL_INFO.md
Expand Up @@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
| [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) |
| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
| [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) |
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
| [``google-cloud-tasks``](https://pypi.org/project/google-cloud-tasks/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-tasks/blob/master/UPGRADING.md) |
Expand Down
46 changes: 22 additions & 24 deletions airflow/providers/google/cloud/example_dags/example_stackdriver.py
Expand Up @@ -42,78 +42,76 @@

TEST_ALERT_POLICY_1 = {
"combiner": "OR",
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": True,
"displayName": "test alert 1",
"display_name": "test alert 1",
"conditions": [
{
"conditionThreshold": {
"condition_threshold": {
"filter": (
'metric.label.state="blocked" AND '
'metric.type="agent.googleapis.com/processes/count_by_state" '
'AND resource.type="gce_instance"'
),
"comparison": "COMPARISON_GT",
"thresholdValue": 100,
"duration": "900s",
"threshold_value": 100,
"duration": {'seconds': 900},
"trigger": {"percent": 0},
"aggregations": [
{
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
"alignment_period": {'seconds': 60},
"per_series_aligner": "ALIGN_MEAN",
"cross_series_reducer": "REDUCE_MEAN",
"group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"],
}
],
},
"displayName": "test_alert_policy_1",
"display_name": "test_alert_policy_1",
}
],
}

TEST_ALERT_POLICY_2 = {
"combiner": "OR",
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": False,
"displayName": "test alert 2",
"display_name": "test alert 2",
"conditions": [
{
"conditionThreshold": {
"condition_threshold": {
"filter": (
'metric.label.state="blocked" AND '
'metric.type="agent.googleapis.com/processes/count_by_state" AND '
'resource.type="gce_instance"'
),
"comparison": "COMPARISON_GT",
"thresholdValue": 100,
"duration": "900s",
"threshold_value": 100,
"duration": {'seconds': 900},
"trigger": {"percent": 0},
"aggregations": [
{
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
"alignment_period": {'seconds': 60},
"per_series_aligner": "ALIGN_MEAN",
"cross_series_reducer": "REDUCE_MEAN",
"group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"],
}
],
},
"displayName": "test_alert_policy_2",
"display_name": "test_alert_policy_2",
}
],
}

TEST_NOTIFICATION_CHANNEL_1 = {
"displayName": "channel1",
"display_name": "channel1",
"enabled": True,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
"type": "slack",
"type_": "slack",
}

TEST_NOTIFICATION_CHANNEL_2 = {
"displayName": "channel2",
"display_name": "channel2",
"enabled": False,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
"type": "slack",
"type_": "slack",
}

with models.DAG(
Expand Down
128 changes: 69 additions & 59 deletions airflow/providers/google/cloud/hooks/stackdriver.py
Expand Up @@ -24,7 +24,8 @@
from google.api_core.exceptions import InvalidArgument
from google.api_core.gapic_v1.method import DEFAULT
from google.cloud import monitoring_v3
from google.protobuf.json_format import MessageToDict, MessageToJson, Parse
from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
from google.protobuf.field_mask_pb2 import FieldMask
from googleapiclient.errors import HttpError

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -110,18 +111,20 @@ def list_alert_policies(
"""
client = self._get_policy_client()
policies_ = client.list_alert_policies(
name=f'projects/{project_id}',
filter_=filter_,
order_by=order_by,
page_size=page_size,
request={
'name': f'projects/{project_id}',
'filter': filter_,
'order_by': order_by,
'page_size': page_size,
},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
if format_ == "dict":
return [MessageToDict(policy) for policy in policies_]
return [AlertPolicy.to_dict(policy) for policy in policies_]
elif format_ == "json":
return [MessageToJson(policy) for policy in policies_]
return [AlertPolicy.to_jsoon(policy) for policy in policies_]
else:
return policies_

Expand All @@ -138,12 +141,14 @@ def _toggle_policy_status(
client = self._get_policy_client()
policies_ = self.list_alert_policies(project_id=project_id, filter_=filter_)
for policy in policies_:
if policy.enabled.value != bool(new_state):
policy.enabled.value = bool(new_state)
mask = monitoring_v3.types.field_mask_pb2.FieldMask()
mask.paths.append('enabled') # pylint: disable=no-member
if policy.enabled != bool(new_state):
policy.enabled = bool(new_state)
mask = FieldMask(paths=['enabled'])
client.update_alert_policy(
alert_policy=policy, update_mask=mask, retry=retry, timeout=timeout, metadata=metadata
request={'alert_policy': policy, 'update_mask': mask},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -266,38 +271,38 @@ def upsert_alert(
policies_ = []
channels = []
for channel in record.get("channels", []):
channel_json = json.dumps(channel)
channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()))
channels.append(NotificationChannel(**channel))
for policy in record.get("policies", []):
policy_json = json.dumps(policy)
policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy()))
policies_.append(AlertPolicy(**policy))

channel_name_map = {}

for channel in channels:
channel.verification_status = (
monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
)

if channel.name in existing_channels:
channel_client.update_notification_channel(
notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata
request={'notification_channel': channel},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
else:
old_name = channel.name
channel.ClearField('name')
channel.name = None
new_channel = channel_client.create_notification_channel(
name=f'projects/{project_id}',
notification_channel=channel,
request={'name': f'projects/{project_id}', 'notification_channel': channel},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
channel_name_map[old_name] = new_channel.name

for policy in policies_:
policy.ClearField('creation_record')
policy.ClearField('mutation_record')
policy.creation_record = None
policy.mutation_record = None

for i, channel in enumerate(policy.notification_channels):
new_channel = channel_name_map.get(channel)
Expand All @@ -307,20 +312,22 @@ def upsert_alert(
if policy.name in existing_policies:
try:
policy_client.update_alert_policy(
alert_policy=policy, retry=retry, timeout=timeout, metadata=metadata
request={'alert_policy': policy},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
except InvalidArgument:
pass
else:
policy.ClearField('name')
policy.name = None
for condition in policy.conditions:
condition.ClearField('name')
condition.name = None
policy_client.create_alert_policy(
name=f'projects/{project_id}',
alert_policy=policy,
request={'name': f'projects/{project_id}', 'alert_policy': policy},
retry=retry,
timeout=timeout,
metadata=None,
metadata=metadata or (),
)

def delete_alert_policy(
Expand Down Expand Up @@ -348,7 +355,9 @@ def delete_alert_policy(
"""
policy_client = self._get_policy_client()
try:
policy_client.delete_alert_policy(name=name, retry=retry, timeout=timeout, metadata=metadata)
policy_client.delete_alert_policy(
request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
except HttpError as err:
raise AirflowException(f'Delete alerting policy failed. Error was {err.content}')

Expand Down Expand Up @@ -404,18 +413,20 @@ def list_notification_channels(
"""
client = self._get_channel_client()
channels = client.list_notification_channels(
name=f'projects/{project_id}',
filter_=filter_,
order_by=order_by,
page_size=page_size,
request={
'name': f'projects/{project_id}',
'filter': filter_,
'order_by': order_by,
'page_size': page_size,
},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
if format_ == "dict":
return [MessageToDict(channel) for channel in channels]
return [NotificationChannel.to_dict(channel) for channel in channels]
elif format_ == "json":
return [MessageToJson(channel) for channel in channels]
return [NotificationChannel.to_json(channel) for channel in channels]
else:
return channels

Expand All @@ -430,18 +441,18 @@ def _toggle_channel_status(
metadata: Optional[str] = None,
) -> None:
client = self._get_channel_client()
channels = client.list_notification_channels(name=f'projects/{project_id}', filter_=filter_)
channels = client.list_notification_channels(
request={'name': f'projects/{project_id}', 'filter': filter_}
)
for channel in channels:
if channel.enabled.value != bool(new_state):
channel.enabled.value = bool(new_state)
mask = monitoring_v3.types.field_mask_pb2.FieldMask()
mask.paths.append('enabled') # pylint: disable=no-member
if channel.enabled != bool(new_state):
channel.enabled = bool(new_state)
mask = FieldMask(paths=['enabled'])
client.update_notification_channel(
notification_channel=channel,
update_mask=mask,
request={'notification_channel': channel, 'update_mask': mask},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -517,7 +528,7 @@ def disable_notification_channels(
new_state=False,
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -561,29 +572,28 @@ def upsert_channel(
channel_name_map = {}

for channel in record["channels"]:
channel_json = json.dumps(channel)
channels_list.append(
Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel())
)
channels_list.append(NotificationChannel(**channel))

for channel in channels_list:
channel.verification_status = (
monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
)

if channel.name in existing_channels:
channel_client.update_notification_channel(
notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata
request={'notification_channel': channel},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
else:
old_name = channel.name
channel.ClearField('name')
channel.name = None
new_channel = channel_client.create_notification_channel(
name=f'projects/{project_id}',
notification_channel=channel,
request={'name': f'projects/{project_id}', 'notification_channel': channel},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
channel_name_map[old_name] = new_channel.name

Expand Down Expand Up @@ -615,7 +625,7 @@ def delete_notification_channel(
channel_client = self._get_channel_client()
try:
channel_client.delete_notification_channel(
name=name, retry=retry, timeout=timeout, metadata=metadata
request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
except HttpError as err:
raise AirflowException(f'Delete notification channel failed. Error was {err.content}')
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/operators/stackdriver.py
Expand Up @@ -19,7 +19,7 @@
from typing import Optional, Sequence, Union

from google.api_core.gapic_v1.method import DEFAULT
from google.protobuf.json_format import MessageToDict
from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook
Expand Down Expand Up @@ -150,7 +150,7 @@ def execute(self, context):
timeout=self.timeout,
metadata=self.metadata,
)
return [MessageToDict(policy) for policy in result]
return [AlertPolicy.to_dict(policy) for policy in result]


class StackdriverEnableAlertPoliciesOperator(BaseOperator):
Expand Down Expand Up @@ -639,7 +639,7 @@ def execute(self, context):
timeout=self.timeout,
metadata=self.metadata,
)
result = [MessageToDict(channel) for channel in channels]
result = [NotificationChannel.to_dict(channel) for channel in channels]
return result


Expand Down

0 comments on commit d2efb33

Please sign in to comment.