Skip to content

Commit

Permalink
Format all files (without excepions) by black (#12091)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Nov 4, 2020
1 parent fd3db77 commit 91a64db
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 155 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ repos:
rev: 20.8b1
hooks:
- id: black
exclude: .*kubernetes_pod\.py|.*google/common/hooks/base_google\.py$|^airflow/configuration.py$
args: [--config=./pyproject.toml]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.3.0
Expand Down
156 changes: 73 additions & 83 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@

# show Airflow's deprecation warnings
if not sys.warnoptions:
warnings.filterwarnings(
action='default', category=DeprecationWarning, module='airflow')
warnings.filterwarnings(
action='default', category=PendingDeprecationWarning, module='airflow')
warnings.filterwarnings(action='default', category=DeprecationWarning, module='airflow')
warnings.filterwarnings(action='default', category=PendingDeprecationWarning, module='airflow')


def expand_env_var(env_var):
Expand All @@ -70,17 +68,14 @@ def expand_env_var(env_var):
def run_command(command):
"""Runs command and returns stdout"""
process = subprocess.Popen(
shlex.split(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True)
output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore')
for stream in process.communicate()]
shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
)
output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore') for stream in process.communicate()]

if process.returncode != 0:
raise AirflowConfigException(
"Cannot execute {}. Error code is: {}. Output: {}, Stderr: {}"
.format(command, process.returncode, output, stderr)
f"Cannot execute {command}. Error code is: {process.returncode}. "
f"Output: {output}, Stderr: {stderr}"
)

return output
Expand Down Expand Up @@ -183,9 +178,9 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
'email_backend': (
re.compile(r'^airflow\.contrib\.utils\.sendgrid\.send_email$'),
r'airflow.providers.sendgrid.utils.emailer.send_email',
'2.0'
'2.0',
),
}
},
}

# This method transforms option names on every read, get, or set operation.
Expand Down Expand Up @@ -213,14 +208,14 @@ def _validate(self):
current_value = self.get(section, name, fallback=None)
if self._using_old_value(old, current_value):
new_value = re.sub(old, new, current_value)
self._update_env_var(
section=section, name=name, new_value=new_value)
self._update_env_var(section=section, name=name, new_value=new_value)
self._create_future_warning(
name=name,
section=section,
current_value=current_value,
new_value=new_value,
version=version)
version=version,
)

self.is_validated = True

Expand All @@ -229,21 +224,27 @@ def _validate_config_dependencies(self):
Validate that config values aren't invalid given other config values
or system-level limitations and requirements.
"""
if (
self.get("core", "executor") not in ('DebugExecutor', 'SequentialExecutor') and
"sqlite" in self.get('core', 'sql_alchemy_conn')):
is_executor_without_sqlite_support = self.get("core", "executor") not in (
'DebugExecutor',
'SequentialExecutor',
)
is_sqlite = "sqlite" in self.get('core', 'sql_alchemy_conn')
if is_executor_without_sqlite_support and is_sqlite:
raise AirflowConfigException(
"error: cannot use sqlite with the {}".format(
self.get('core', 'executor')))
"error: cannot use sqlite with the {}".format(self.get('core', 'executor'))
)

if self.has_option('core', 'mp_start_method'):
mp_start_method = self.get('core', 'mp_start_method')
start_method_options = multiprocessing.get_all_start_methods()

if mp_start_method not in start_method_options:
raise AirflowConfigException(
"mp_start_method should not be " + mp_start_method +
". Possible values are " + ", ".join(start_method_options))
"mp_start_method should not be "
+ mp_start_method
+ ". Possible values are "
+ ", ".join(start_method_options)
)

def _using_old_value(self, old, current_value): # noqa
return old.search(current_value) is not None
Expand All @@ -264,7 +265,7 @@ def _create_future_warning(name, section, current_value, new_value, version):
'Airflow {version}.'.format(
name=name, section=section, current_value=current_value, new_value=new_value, version=version
),
FutureWarning
FutureWarning,
)

@staticmethod
Expand Down Expand Up @@ -336,17 +337,12 @@ def get(self, section, key, **kwargs):
def _get_option_from_default_config(self, section, key, **kwargs):
# ...then the default config
if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
return expand_env_var(
self.airflow_defaults.get(section, key, **kwargs))
return expand_env_var(self.airflow_defaults.get(section, key, **kwargs))

else:
log.warning(
"section/key [%s/%s] not found in config", section, key
)
log.warning("section/key [%s/%s] not found in config", section, key)

raise AirflowConfigException(
"section/key [{section}/{key}] not found "
"in config".format(section=section, key=key))
raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")

def _get_option_from_secrets(self, deprecated_key, deprecated_section, key, section):
# ...then from secret backends
Expand Down Expand Up @@ -377,16 +373,11 @@ def _get_option_from_config_file(self, deprecated_key, deprecated_section, key,
if super().has_option(section, key):
# Use the parent's methods to get the actual config here to be able to
# separate the config from default config.
return expand_env_var(
super().get(section, key, **kwargs))
return expand_env_var(super().get(section, key, **kwargs))
if deprecated_section:
if super().has_option(deprecated_section, deprecated_key):
self._warn_deprecate(section, key, deprecated_section, deprecated_key)
return expand_env_var(super().get(
deprecated_section,
deprecated_key,
**kwargs
))
return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
return None

def _get_environment_variables(self, deprecated_key, deprecated_section, key, section):
Expand Down Expand Up @@ -543,8 +534,13 @@ def write(self, fp, space_around_delimiters=True):
self._write_section(fp, section, self.getsection(section).items(), delimiter)

def as_dict(
self, display_source=False, display_sensitive=False, raw=False,
include_env=True, include_cmds=True, include_secret=True
self,
display_source=False,
display_sensitive=False,
raw=False,
include_env=True,
include_cmds=True,
include_secret=True,
) -> Dict[str, Dict[str, str]]:
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
Expand Down Expand Up @@ -624,8 +620,9 @@ def _include_commands(self, config_sources, display_sensitive, display_source, r
del config_sources[section][key + '_cmd']

def _include_envs(self, config_sources, display_sensitive, display_source, raw):
for env_var in [os_environment
for os_environment in os.environ if os_environment.startswith('AIRFLOW__')]:
for env_var in [
os_environment for os_environment in os.environ if os_environment.startswith('AIRFLOW__')
]:
try:
_, section, key = env_var.split('__', 2)
opt = self._get_env_var_option(section, key)
Expand All @@ -651,13 +648,14 @@ def _include_envs(self, config_sources, display_sensitive, display_source, raw):
def _replace_config_with_display_sources(config_sources, configs, display_source, raw):
for (source_name, config) in configs:
for section in config.sections():
AirflowConfigParser.\
_replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name)
AirflowConfigParser._replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name
)

@staticmethod
def _replace_section_config_with_display_sources(config, config_sources, display_source, raw, section,
source_name):
def _replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name
):
sect = config_sources.setdefault(section, OrderedDict())
for (k, val) in config.items(section=section, raw=raw):
if display_source:
Expand Down Expand Up @@ -730,19 +728,17 @@ def get_airflow_config(airflow_home):
# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'dags')
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'tests', 'dags'
)
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')

# Set up plugins folder for unit tests
_TEST_PLUGINS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'plugins')
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'tests', 'plugins'
)
if os.path.exists(_TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
else:
Expand Down Expand Up @@ -777,20 +773,14 @@ def get_airflow_test_config(airflow_home):

SECRET_KEY = b64encode(os.urandom(16)).decode('utf-8')

TEMPLATE_START = (
'# ----------------------- TEMPLATE BEGINS HERE -----------------------')
TEMPLATE_START = '# ----------------------- TEMPLATE BEGINS HERE -----------------------'
if not os.path.isfile(TEST_CONFIG_FILE):
log.info(
'Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE
)
log.info('Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE)
with open(TEST_CONFIG_FILE, 'w') as file:
cfg = parameterized_config(TEST_CONFIG)
file.write(cfg.split(TEMPLATE_START)[-1].strip())
if not os.path.isfile(AIRFLOW_CONFIG):
log.info(
'Creating new Airflow config file in: %s',
AIRFLOW_CONFIG
)
log.info('Creating new Airflow config file in: %s', AIRFLOW_CONFIG)
with open(AIRFLOW_CONFIG, 'w') as file:
cfg = parameterized_config(DEFAULT_CONFIG)
cfg = cfg.split(TEMPLATE_START)[-1].strip()
Expand Down Expand Up @@ -835,110 +825,110 @@ def get_airflow_test_config(airflow_home):


# Historical convenience functions to access config entries
def load_test_config(): # noqa: D103
def load_test_config(): # noqa: D103
"""Historical load_test_config"""
warnings.warn(
"Accessing configuration method 'load_test_config' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.load_test_config'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
conf.load_test_config()


def get(*args, **kwargs): # noqa: D103
def get(*args, **kwargs): # noqa: D103
"""Historical get"""
warnings.warn(
"Accessing configuration method 'get' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.get'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.get(*args, **kwargs)


def getboolean(*args, **kwargs): # noqa: D103
def getboolean(*args, **kwargs): # noqa: D103
"""Historical getboolean"""
warnings.warn(
"Accessing configuration method 'getboolean' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getboolean'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getboolean(*args, **kwargs)


def getfloat(*args, **kwargs): # noqa: D103
def getfloat(*args, **kwargs): # noqa: D103
"""Historical getfloat"""
warnings.warn(
"Accessing configuration method 'getfloat' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getfloat'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getfloat(*args, **kwargs)


def getint(*args, **kwargs): # noqa: D103
def getint(*args, **kwargs): # noqa: D103
"""Historical getint"""
warnings.warn(
"Accessing configuration method 'getint' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getint'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getint(*args, **kwargs)


def getsection(*args, **kwargs): # noqa: D103
def getsection(*args, **kwargs): # noqa: D103
"""Historical getsection"""
warnings.warn(
"Accessing configuration method 'getsection' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getsection'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.getint(*args, **kwargs)


def has_option(*args, **kwargs): # noqa: D103
def has_option(*args, **kwargs): # noqa: D103
"""Historical has_option"""
warnings.warn(
"Accessing configuration method 'has_option' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.has_option'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.has_option(*args, **kwargs)


def remove_option(*args, **kwargs): # noqa: D103
def remove_option(*args, **kwargs): # noqa: D103
"""Historical remove_option"""
warnings.warn(
"Accessing configuration method 'remove_option' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.remove_option'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.remove_option(*args, **kwargs)


def as_dict(*args, **kwargs): # noqa: D103
def as_dict(*args, **kwargs): # noqa: D103
"""Historical as_dict"""
warnings.warn(
"Accessing configuration method 'as_dict' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.as_dict'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.as_dict(*args, **kwargs)

Expand All @@ -950,7 +940,7 @@ def set(*args, **kwargs): # noqa pylint: disable=redefined-builtin
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.set'",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return conf.set(*args, **kwargs)

Expand Down

0 comments on commit 91a64db

Please sign in to comment.