Skip to content

Commit

Permalink
Move provider's log task handlers to the provider package (#9604)
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Jul 6, 2020
1 parent 263ff26 commit a79e2d4
Show file tree
Hide file tree
Showing 20 changed files with 858 additions and 674 deletions.
14 changes: 13 additions & 1 deletion UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,21 @@ More tips can be found in the guide:
https://developers.google.com/style/inclusive-documentation
-->
### StackdriverTaskHandler has been moved
The `StackdriverTaskHandler` class from `airflow.utils.log.stackdriver_task_handler` has been moved to
`airflow.providers.google.cloud.log.stackdriver_task_handler`. This is because it has items specific to `google cloud`.

### S3TaskHandler has been moved
The `S3TaskHandler` class from `airflow.utils.log.s3_task_handler` has been moved to
`airflow.providers.amazon.aws.log.s3_task_handler`. This is because it has items specific to `aws`
`airflow.providers.amazon.aws.log.s3_task_handler`. This is because it has items specific to `aws`.

### ElasticsearchTaskHandler has been moved
The `ElasticsearchTaskHandler` class from `airflow.utils.log.es_task_handler` has been moved to
`airflow.providers.elasticsearch.log.es_task_handler`. This is because it has items specific to `elasticsearch`.

### CloudwatchTaskHandler has been moved
The `CloudwatchTaskHandler` class from `airflow.utils.log.cloudwatch_task_handler` has been moved to
`airflow.providers.amazon.aws.log.cloudwatch_task_handler`. This is because it has items specific to `aws`.

### SendGrid emailer has been moved
Formerly the core code was maintained by the original creators - Airbnb. The code that was in the contrib
Expand Down
4 changes: 2 additions & 2 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@
log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:]
STACKDRIVER_REMOTE_HANDLERS = {
'task': {
'class': 'airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler',
'class': 'airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler',
'formatter': 'airflow',
'name': log_name,
'gcp_key_path': key_path
Expand All @@ -241,7 +241,7 @@

ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
'task': {
'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
'class': 'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
'formatter': 'airflow',
'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
Expand Down
115 changes: 115 additions & 0 deletions airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import watchtower
from cached_property import cached_property

from airflow.configuration import conf
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin


class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
"""
CloudwatchTaskHandler is a python log handler that handles and reads task instance logs.
It extends airflow FileTaskHandler and uploads to and reads from Cloudwatch.
:param base_log_folder: base folder to store logs locally
:type base_log_folder: str
:param log_group_arn: ARN of the Cloudwatch log group for remote log storage
with format ``arn:aws:logs:{region name}:{account id}:log-group:{group name}``
:type log_group_arn: str
:param filename_template: template for file name (local storage) or log stream name (remote)
:type filename_template: str
"""
def __init__(self, base_log_folder, log_group_arn, filename_template):
super().__init__(base_log_folder, filename_template)
split_arn = log_group_arn.split(':')

self.handler = None
self.log_group = split_arn[6]
self.region_name = split_arn[3]
self.closed = False

@cached_property
def hook(self):
"""
Returns AwsLogsHook.
"""
remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
try:
from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
return AwsLogsHook(aws_conn_id=remote_conn_id, region_name=self.region_name)
except Exception: # pylint: disable=broad-except
self.log.error(
'Could not create an AwsLogsHook with connection id "%s". '
'Please make sure that airflow[aws] is installed and '
'the Cloudwatch logs connection exists.', remote_conn_id
)

def _render_filename(self, ti, try_number):
# Replace unsupported log group name characters
return super()._render_filename(ti, try_number).replace(':', '_')

def set_context(self, ti):
super().set_context(ti)
self.handler = watchtower.CloudWatchLogHandler(
log_group=self.log_group,
stream_name=self._render_filename(ti, ti.try_number),
boto3_session=self.hook.get_session(self.region_name)
)

def close(self):
"""
Close the handler responsible for the upload of the local log file to Cloudwatch.
"""
# When application exit, system shuts down all handlers by
# calling close method. Here we check if logger is already
# closed to prevent uploading the log to remote storage multiple
# times when `logging.shutdown` is called.
if self.closed:
return

if self.handler is not None:
self.handler.close()
# Mark closed so we don't double write if close is called twice
self.closed = True

def _read(self, task_instance, try_number, metadata=None):
stream_name = self._render_filename(task_instance, try_number)
return '*** Reading remote log from Cloudwatch log_group: {} log_stream: {}.\n{}\n'.format(
self.log_group, stream_name, self.get_cloudwatch_logs(stream_name=stream_name)
), {'end_of_log': True}

def get_cloudwatch_logs(self, stream_name):
"""
Return all logs from the given log stream.
:param stream_name: name of the Cloudwatch log stream to get all logs from
:return: string of all logs from the given log stream
"""
try:
events = list(self.hook.get_log_events(log_group=self.log_group, log_stream_name=stream_name))
return '\n'.join(reversed([event['message'] for event in events]))
except Exception: # pylint: disable=broad-except
msg = 'Could not read remote logs from log_group: {} log_stream: {}.'.format(
self.log_group, stream_name
)
self.log.exception(msg)
return msg
16 changes: 16 additions & 0 deletions airflow/providers/elasticsearch/log/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

0 comments on commit a79e2d4

Please sign in to comment.