Skip to content

Commit

Permalink
[AIRFLOW-4541] Replace os.mkdirs usage with pathlib.Path(path).mkdir (#…
Browse files Browse the repository at this point in the history
…10117)

`makedirs` is used in `airlfow.utils.file.mkdirs`  - it is replaced with pathlib now with python3.5+
  • Loading branch information
kaxil committed Aug 2, 2020
1 parent 1d68cd2 commit 4e3799f
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 45 deletions.
4 changes: 2 additions & 2 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
"""Airflow logging settings"""

import os
from pathlib import Path
from typing import Any, Dict, Union
from urllib.parse import urlparse

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.utils.file import mkdirs

# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
Expand Down Expand Up @@ -151,7 +151,7 @@
processor_manager_handler_config: Dict[str, Any] = \
DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']['processor_manager']
directory: str = os.path.dirname(processor_manager_handler_config['filename'])
mkdirs(directory, 0o755)
Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)

##################
# Remote logging #
Expand Down
10 changes: 3 additions & 7 deletions airflow/providers/google/cloud/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import subprocess
import time
import uuid
from pathlib import Path
from subprocess import PIPE, Popen
from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.parse import quote_plus
Expand Down Expand Up @@ -553,13 +554,8 @@ def start_proxy(self) -> None:
else:
command_to_run = [self.sql_proxy_path]
command_to_run.extend(self.command_line_parameters)
try:
self.log.info("Creating directory %s",
self.cloud_sql_proxy_socket_directory)
os.makedirs(self.cloud_sql_proxy_socket_directory)
except OSError:
# Needed for python 2 compatibility (exists_ok missing)
pass
self.log.info("Creating directory %s", self.cloud_sql_proxy_socket_directory)
Path(self.cloud_sql_proxy_socket_directory).mkdir(parents=True, exist_ok=True)
command_to_run.extend(self._get_credential_parameters()) # pylint: disable=no-value-for-parameter
self.log.info("Running the command: `%s`", " ".join(command_to_run))
self.sql_proxy_process = Popen(command_to_run,
Expand Down
8 changes: 2 additions & 6 deletions airflow/providers/sftp/operators/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
This module contains SFTP operator.
"""
import os
from pathlib import Path

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
Expand Down Expand Up @@ -130,12 +131,7 @@ def execute(self, context):
if self.operation.lower() == SFTPOperation.GET:
local_folder = os.path.dirname(self.local_filepath)
if self.create_intermediate_dirs:
# Create Intermediate Directories if it doesn't exist
try:
os.makedirs(local_folder)
except OSError:
if not os.path.isdir(local_folder):
raise
Path(local_folder).mkdir(parents=True, exist_ok=True)
file_msg = "from {0} to {1}".format(self.remote_filepath,
self.local_filepath)
self.log.info("Starting to transfer %s", file_msg)
Expand Down
15 changes: 7 additions & 8 deletions airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import re
import zipfile
from pathlib import Path
from typing import Dict, Generator, List, Optional, Pattern

from airflow.configuration import conf
Expand Down Expand Up @@ -50,14 +51,12 @@ def mkdirs(path, mode):
:param mode: The mode to give to the directory e.g. 0o755, ignores umask
:type mode: int
"""
try:
o_umask = os.umask(0)
os.makedirs(path, mode)
except OSError:
if not os.path.isdir(path):
raise
finally:
os.umask(o_umask)
import warnings
warnings.warn(
f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
DeprecationWarning, stacklevel=2
)
Path(path).mkdir(mode=mode, parents=True, exist_ok=True)


ZIP_REGEX = re.compile(r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)))
Expand Down
18 changes: 3 additions & 15 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import os
from datetime import datetime
from pathlib import Path

from airflow import settings
from airflow.utils.helpers import parse_template_string
Expand All @@ -43,15 +44,7 @@ def __init__(self, base_log_folder, filename_template):
parse_template_string(filename_template)

self._cur_date = datetime.today()
if not os.path.exists(self._get_log_directory()):
try:
os.makedirs(self._get_log_directory())
except OSError:
# only ignore case where the directory already exist
if not os.path.isdir(self._get_log_directory()):
raise

logging.warning("%s already exists", self._get_log_directory())
Path(self._get_log_directory()).mkdir(parents=True, exist_ok=True)

self._symlink_latest_log_directory()

Expand Down Expand Up @@ -137,12 +130,7 @@ def _init_file(self, filename):
log_file_path = os.path.abspath(relative_log_file_path)
directory = os.path.dirname(log_file_path)

if not os.path.exists(directory):
try:
os.makedirs(directory)
except OSError:
if not os.path.isdir(directory):
raise
Path(directory).mkdir(parents=True, exist_ok=True)

if not os.path.exists(log_file_path):
open(log_file_path, "a").close()
Expand Down
7 changes: 2 additions & 5 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
"""File logging handler for tasks."""
import logging
import os
from pathlib import Path
from typing import Optional

import requests

from airflow.configuration import AirflowConfigException, conf
from airflow.models import TaskInstance
from airflow.utils.file import mkdirs
from airflow.utils.helpers import parse_template_string


Expand Down Expand Up @@ -223,10 +223,7 @@ def _init_file(self, ti):
# operator is not compatible with impersonation (e.g. if a Celery executor is used
# for a SubDag operator and the SubDag operator has a different owner than the
# parent DAG)
if not os.path.exists(directory):
# Create the directory as globally writable using custom mkdirs
# as os.makedirs doesn't set mode properly.
mkdirs(directory, 0o777)
Path(directory).mkdir(mode=0o777, parents=True, exist_ok=True)

if not os.path.exists(full_path):
open(full_path, "a").close()
Expand Down
4 changes: 2 additions & 2 deletions tests/test_utils/system_tests_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import shutil
import sys
from datetime import datetime
from pathlib import Path
from unittest import TestCase

from airflow.configuration import AIRFLOW_HOME, AirflowConfigParser, get_airflow_config
from airflow.exceptions import AirflowException
from airflow.models.dagbag import DagBag
from airflow.utils.file import mkdirs
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from tests.test_utils import AIRFLOW_MAIN_FOLDER
Expand Down Expand Up @@ -94,7 +94,7 @@ def tearDown(self) -> None:
print(f"Saving all log files to {logs_folder}/previous_runs/{date_str}")
print()
target_dir = os.path.join(logs_folder, "previous_runs", date_str)
mkdirs(target_dir, 0o755)
Path(target_dir).mkdir(parents=True, exist_ok=True, mode=0o755)
files = os.listdir(logs_folder)
for file in files:
if file != "previous_runs":
Expand Down

0 comments on commit 4e3799f

Please sign in to comment.