Skip to content

Commit

Permalink
Support deleting the local log files when using remote logging (#29772)
Browse files Browse the repository at this point in the history
* add a new config

* add remote_task_handler_kwargs conf and add its content as kwargs for remote task handlers

* add delete_local_logs to logging tasks doc

Co-authored-by: Niko Oliveira <[email protected]>

---------

Co-authored-by: Niko Oliveira <[email protected]>
  • Loading branch information
hussein-awala and o-nikolas committed Mar 7, 2023
1 parent c405ecb commit b6392ae
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 29 deletions.
3 changes: 2 additions & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
# WASB buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})

if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
Expand Down Expand Up @@ -252,7 +253,6 @@
"wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
"wasb_container": "airflow-logs",
"filename_template": FILENAME_TEMPLATE,
"delete_local_copy": False,
},
}

Expand Down Expand Up @@ -315,3 +315,4 @@
"section 'elasticsearch' if you are using Elasticsearch. In the other case, "
"'remote_base_log_folder' option in the 'logging' section."
)
DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)
18 changes: 18 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,14 @@ logging:
type: string
example: ~
default: ""
delete_local_logs:
description: |
Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after
they are uploaded to the remote location.
version_added: 2.6.0
type: string
example: ~
default: "False"
google_key_path:
description: |
Path to Google Credential JSON file. If omitted, authorization based on `the Application Default
Expand All @@ -628,6 +636,16 @@ logging:
type: string
example: ~
default: ""
remote_task_handler_kwargs:
description: |
The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote
task handler and it overrides the values provided by Airflow config. For example if you set
`delete_local_logs=False` and you provide ``{{"delete_local_copy": true}}``, then the local
log files will be deleted after they are uploaded to remote location.
version_added: 2.6.0
type: string
example: '{"delete_local_copy": true}'
default: ""
encrypt_s3_logs:
description: |
Use server-side encryption for logs stored in S3
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ remote_logging = False
# reading logs, not writing them.
remote_log_conn_id =

# Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after
# they are uploaded to the remote location.
delete_local_logs = False

# Path to Google Credential JSON file. If omitted, authorization based on `the Application Default
# Credentials
# <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
Expand All @@ -359,6 +363,13 @@ google_key_path =
# Stackdriver logs should start with "stackdriver://"
remote_base_log_folder =

# The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote
# task handler and it overrides the values provided by Airflow config. For example if you set
# `delete_local_logs=False` and you provide ``{{"delete_local_copy": true}}``, then the local
# log files will be deleted after they are uploaded to remote location.
# Example: remote_task_handler_kwargs = {{"delete_local_copy": true}}
remote_task_handler_kwargs =

# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False

Expand Down
32 changes: 27 additions & 5 deletions airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import contextlib
import os
import pathlib
import shutil

from packaging.version import Version

from airflow.compat.functools import cached_property
from airflow.configuration import conf
Expand All @@ -28,21 +31,35 @@
from airflow.utils.log.logging_mixin import LoggingMixin


def get_default_delete_local_copy():
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not
TODO: delete this function when min airflow version >= 2.6
"""
from airflow.version import version

if Version(version) < Version("2.6"):
return False
return conf.getboolean("logging", "delete_local_logs")


class OSSTaskHandler(FileTaskHandler, LoggingMixin):
"""
OSSTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from OSS remote storage.
"""

def __init__(self, base_log_folder, oss_log_folder, filename_template=None):
def __init__(self, base_log_folder, oss_log_folder, filename_template=None, **kwargs):
self.log.info("Using oss_task_handler for remote logging...")
super().__init__(base_log_folder, filename_template)
(self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
self.log_relative_path = ""
self._hook = None
self.closed = False
self.upload_on_close = True
self.delete_local_copy = (
kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy()
)

@cached_property
def hook(self):
Expand Down Expand Up @@ -92,7 +109,9 @@ def close(self):
if os.path.exists(local_loc):
# read log and remove old logs to get just the latest additions
log = pathlib.Path(local_loc).read_text()
self.oss_write(log, remote_loc)
oss_write = self.oss_write(log, remote_loc)
if oss_write and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand Down Expand Up @@ -154,15 +173,16 @@ def oss_read(self, remote_log_location, return_error=False):
if return_error:
return msg

def oss_write(self, log, remote_log_location, append=True):
def oss_write(self, log, remote_log_location, append=True) -> bool:
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
Writes the log to the remote_log_location and return `True` when done. Fails silently
and return `False` if no log was created.
:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:return: whether the log is successfully written to remote location or not.
"""
oss_remote_log_location = f"{self.base_folder}/{remote_log_location}"
pos = 0
Expand All @@ -180,3 +200,5 @@ def oss_write(self, log, remote_log_location, append=True):
str(pos),
str(append),
)
return False
return True
35 changes: 30 additions & 5 deletions airflow/providers/amazon/aws/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import os
import pathlib
import shutil

from packaging.version import Version

from airflow.compat.functools import cached_property
from airflow.configuration import conf
Expand All @@ -27,6 +30,17 @@
from airflow.utils.log.logging_mixin import LoggingMixin


def get_default_delete_local_copy():
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not
TODO: delete this function when min airflow version >= 2.6
"""
from airflow.version import version

if Version(version) < Version("2.6"):
return False
return conf.getboolean("logging", "delete_local_logs")


class S3TaskHandler(FileTaskHandler, LoggingMixin):
"""
S3TaskHandler is a python log handler that handles and reads
Expand All @@ -36,13 +50,18 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):

trigger_should_wrap = True

def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None):
def __init__(
self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None, **kwargs
):
super().__init__(base_log_folder, filename_template)
self.remote_base = s3_log_folder
self.log_relative_path = ""
self._hook = None
self.closed = False
self.upload_on_close = True
self.delete_local_copy = (
kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy()
)

@cached_property
def hook(self):
Expand Down Expand Up @@ -84,7 +103,9 @@ def close(self):
if os.path.exists(local_loc):
# read log and remove old logs to get just the latest additions
log = pathlib.Path(local_loc).read_text()
self.s3_write(log, remote_loc)
write_to_s3 = self.s3_write(log, remote_loc)
if write_to_s3 and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand Down Expand Up @@ -164,23 +185,25 @@ def s3_read(self, remote_log_location: str, return_error: bool = False) -> str:
return msg
return ""

def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1):
def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1) -> bool:
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
Writes the log to the remote_log_location and return `True` when done. Fails silently
and return `False` if no log was created.
:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:param max_retry: Maximum number of times to retry on upload failure
:return: whether the log is successfully written to remote location or not.
"""
try:
if append and self.s3_log_exists(remote_log_location):
old_log = self.s3_read(remote_log_location)
log = "\n".join([old_log, log]) if old_log else log
except Exception:
self.log.exception("Could not verify previous log to append")
return False

# Default to a single retry attempt because s3 upload failures are
# rare but occasionally occur. Multiple retry attempts are unlikely
Expand All @@ -199,3 +222,5 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_
self.log.warning("Failed attempt to write logs to %s, will retry", remote_log_location)
else:
self.log.exception("Could not write logs to %s", remote_log_location)
return False
return True
32 changes: 28 additions & 4 deletions airflow/providers/google/cloud/log/gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import logging
import os
import shutil
from pathlib import Path
from typing import Collection

# not sure why but mypy complains on missing `storage` but it is clearly there and is importable
from google.cloud import storage # type: ignore[attr-defined]
from packaging.version import Version

from airflow.compat.functools import cached_property
from airflow.configuration import conf
Expand All @@ -43,6 +45,17 @@
logger = logging.getLogger(__name__)


def get_default_delete_local_copy():
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not
TODO: delete this function when min airflow version >= 2.6
"""
from airflow.version import version

if Version(version) < Version("2.6"):
return False
return conf.getboolean("logging", "delete_local_logs")


class GCSTaskHandler(FileTaskHandler, LoggingMixin):
"""
GCSTaskHandler is a python log handler that handles and reads
Expand All @@ -63,6 +76,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
:param gcp_scopes: Comma-separated string containing OAuth2 scopes
:param project_id: Project ID to read the secrets from. If not passed, the project ID from credentials
will be used.
:param delete_local_copy: Whether local log files should be deleted after they are downloaded when using
remote logging
"""

trigger_should_wrap = True
Expand All @@ -77,6 +92,7 @@ def __init__(
gcp_keyfile_dict: dict | None = None,
gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS,
project_id: str | None = None,
**kwargs,
):
super().__init__(base_log_folder, filename_template)
self.remote_base = gcs_log_folder
Expand All @@ -87,6 +103,9 @@ def __init__(
self.gcp_keyfile_dict = gcp_keyfile_dict
self.scopes = gcp_scopes
self.project_id = project_id
self.delete_local_copy = (
kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy()
)

@cached_property
def hook(self) -> GCSHook | None:
Expand Down Expand Up @@ -147,7 +166,9 @@ def close(self):
# read log and remove old logs to get just the latest additions
with open(local_loc) as logfile:
log = logfile.read()
self.gcs_write(log, remote_loc)
gcs_write = self.gcs_write(log, remote_loc)
if gcs_write and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand Down Expand Up @@ -207,13 +228,14 @@ def _read(self, ti, try_number, metadata=None):

return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True}

def gcs_write(self, log, remote_log_location):
def gcs_write(self, log, remote_log_location) -> bool:
"""
Writes the log to the remote_log_location. Fails silently if no log
was created.
Writes the log to the remote_log_location and return `True` when done. Fails silently
and return `False` if no log was created.
:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:return: whether the log is successfully written to remote location or not.
"""
try:
blob = storage.Blob.from_string(remote_log_location, self.client)
Expand All @@ -232,6 +254,8 @@ def gcs_write(self, log, remote_log_location):
blob.upload_from_string(log, content_type="text/plain")
except Exception as e:
self.log.error("Could not write logs to %s: %s", remote_log_location, e)
return False
return True

@staticmethod
def no_log_found(exc):
Expand Down

0 comments on commit b6392ae

Please sign in to comment.