Skip to content

Commit

Permalink
Add Google Stackdriver link (#9765)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdediana committed Jul 12, 2020
1 parent 7d20059 commit 1de78e8
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@
Cloudwatch log groups should start with "cloudwatch://"
GCS buckets should start with "gs://"
WASB buckets should start with "wasb" just to help Airflow select correct handler
Stackdriver logs should start with "stackdriver://"
version_added: ~
type: string
example: ~
Expand Down
1 change: 1 addition & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ stackdriver_key_path =
# Cloudwatch log groups should start with "cloudwatch://"
# GCS buckets should start with "gs://"
# WASB buckets should start with "wasb" just to help Airflow select correct handler
# Stackdriver logs should start with "stackdriver://"
remote_base_log_folder =

# Use server-side encryption for logs stored in S3
Expand Down
45 changes: 45 additions & 0 deletions airflow/providers/google/cloud/log/stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""
import logging
from typing import Collection, Dict, List, Optional, Tuple, Type
from urllib.parse import urlencode

from cached_property import cached_property
from google.api_core.gapic_v1.client_info import ClientInfo
Expand Down Expand Up @@ -79,6 +80,8 @@ class StackdriverTaskHandler(logging.Handler):
LABEL_DAG_ID = "dag_id"
LABEL_EXECUTION_DATE = "execution_date"
LABEL_TRY_NUMBER = "try_number"
LOG_VIEWER_BASE_URL = "https://console.cloud.google.com/logs/viewer"
LOG_NAME = 'Google Stackdriver'

def __init__(
self,
Expand Down Expand Up @@ -294,3 +297,45 @@ def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
cls.LABEL_EXECUTION_DATE: str(ti.execution_date.isoformat()),
cls.LABEL_TRY_NUMBER: str(ti.try_number),
}

@property
def log_name(self):
"""Return log name."""
return self.LOG_NAME

@cached_property
def _resource_path(self):
segments = [self.resource.type]

for key, value in self.resource.labels:
segments += [key]
segments += [value]

return "/".join(segments)

def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str:
"""
Creates an address for an external log collecting service.
:param task_instance: task instance object
:type: task_instance: TaskInstance
:param try_number: task instance try_number to read logs from.
:type try_number: Optional[int]
:return: URL to the external log collection service
:rtype: str
"""
project_id = self._client.project

ti_labels = self._task_instance_to_labels(task_instance)
ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)

log_filter = self._prepare_log_filter(ti_labels)

url_query_string = {
'project': project_id,
'interval': 'NO_LIMIT',
'resource': self._resource_path,
'advancedFilter': log_filter,
}

url = f"{self.LOG_VIEWER_BASE_URL}?{urlencode(url_query_string)}"
return url
7 changes: 7 additions & 0 deletions docs/howto/write-logs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,10 @@ To enable it, ``airflow.cfg`` must be configured as in the example below. Note t
# Code will construct log_id using the log_id template from the argument above.
# NOTE: The code will prefix the https:// automatically, don't include that here.
frontend = <host_port>/{log_id}
.. _log-link-stackdriver:

Google Stackdriver External Link
---------------------------------

Airflow automatically shows a link to Google Stackdriver when configured to use it as the remote logging system.
30 changes: 30 additions & 0 deletions tests/providers/google/cloud/log/test_stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import unittest
from datetime import datetime
from unittest import mock
from urllib.parse import parse_qs, urlparse

from google.cloud.logging.resource import Resource

Expand Down Expand Up @@ -295,3 +296,32 @@ def test_should_use_credentials(self, mock_client, mock_get_creds_and_project_id
project="project_id"
)
self.assertEqual(mock_client.return_value, client)

@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_return_valid_external_url(self, mock_client, mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
mock_client.return_value.project = 'project_id'

stackdriver_task_handler = StackdriverTaskHandler(
gcp_key_path="KEY_PATH",
)

url = stackdriver_task_handler.get_external_log_url(self.ti, self.ti.try_number)

parsed_url = urlparse(url)
parsed_qs = parse_qs(parsed_url.query)
self.assertEqual('https', parsed_url.scheme)
self.assertEqual('console.cloud.google.com', parsed_url.netloc)
self.assertEqual('/logs/viewer', parsed_url.path)
self.assertCountEqual(['project', 'interval', 'resource', 'advancedFilter'], parsed_qs.keys())
self.assertIn('global', parsed_qs['resource'])

filter_params = parsed_qs['advancedFilter'][0].split('\n')
expected_filter = ['resource.type="global"',
'logName="projects/project_id/logs/airflow"',
f'labels.task_id="{self.ti.task_id}"',
f'labels.dag_id="{self.dag.dag_id}"',
f'labels.execution_date="{self.ti.execution_date.isoformat()}"',
f'labels.try_number="{self.ti.try_number}"']
self.assertCountEqual(expected_filter, filter_params)

0 comments on commit 1de78e8

Please sign in to comment.