Skip to content

Commit

Permalink
More f-strings (#18855)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Oct 17, 2021
1 parent a418fd9 commit 86a2a19
Show file tree
Hide file tree
Showing 130 changed files with 544 additions and 813 deletions.
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ repos:
exclude: |
(?x)
^airflow/_vendor/
args:
# If flynt detects too long text it ignores it. So we set a very large limit to make it easy
# to split the text by hand. Too long lines are detected by flake8 (below),
# so the user is informed to take action.
- --line-length
- '99999'
- repo: https://github.com/codespell-project/codespell
rev: v2.1.0
hooks:
Expand Down
5 changes: 2 additions & 3 deletions airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ def _trigger_dag(
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and execution_date < min_dag_start_date:
raise ValueError(
"The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
execution_date.isoformat(), min_dag_start_date.isoformat()
)
f"The execution_date [{execution_date.isoformat()}] should be >= start_date "
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)

run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
Expand Down
13 changes: 7 additions & 6 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,14 @@ def task_run(args, dag=None):
unsupported_options = [o for o in RAW_TASK_UNSUPPORTED_OPTION if getattr(args, o)]

if unsupported_options:
unsupported_raw_task_flags = ', '.join(f'--{o}' for o in RAW_TASK_UNSUPPORTED_OPTION)
unsupported_flags = ', '.join(f'--{o}' for o in unsupported_options)
raise AirflowException(
"Option --raw does not work with some of the other options on this command. You "
"can't use --raw option and the following options: {}. You provided the option {}. "
"Delete it to execute the command".format(
", ".join(f"--{o}" for o in RAW_TASK_UNSUPPORTED_OPTION),
", ".join(f"--{o}" for o in unsupported_options),
)
"Option --raw does not work with some of the other options on this command. "
"You can't use --raw option and the following options: "
f"{unsupported_raw_task_flags}. "
f"You provided the option {unsupported_flags}. "
"Delete it to execute the command."
)
if dag and args.pickle:
raise AirflowException("You cannot use the --pickle option when using DAG.cli() method.")
Expand Down
5 changes: 2 additions & 3 deletions airflow/cli/commands/user_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ def _import_users(users_list):
print(f"Found existing user with email '{user['email']}'")
if existing_user.username != user['username']:
raise SystemExit(
"Error: Changing the username is not allowed - "
"please delete and recreate the user with "
"email '{}'".format(user['email'])
f"Error: Changing the username is not allowed - please delete and recreate the user with"
f" email {user['email']!r}"
)

existing_user.roles = roles
Expand Down
18 changes: 4 additions & 14 deletions airflow/cli/commands/webserver_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,24 +369,14 @@ def webserver(args):

print(
textwrap.dedent(
'''\
f'''\
Running the Gunicorn Server with:
Workers: {num_workers} {workerclass}
Host: {hostname}:{port}
Workers: {num_workers} {args.workerclass}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
Access Logformat: {access_logformat}
=================================================================\
'''.format(
num_workers=num_workers,
workerclass=args.workerclass,
hostname=args.hostname,
port=args.port,
worker_timeout=worker_timeout,
access_logfile=access_logfile,
error_logfile=error_logfile,
access_logformat=access_logformat,
)
================================================================='''
)
)

Expand Down
5 changes: 2 additions & 3 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ def _broker_supports_visibility_timeout(url):
)
except Exception as e:
raise AirflowException(
'Exception: There was an unknown Celery SSL Error. '
'Please ensure you want to use '
'SSL and/or have all necessary certs and key ({}).'.format(e)
f'Exception: There was an unknown Celery SSL Error. Please ensure you want to use SSL and/or have '
f'all necessary certs and key ({e}).'
)

result_backend = DEFAULT_CELERY_CONFIG['result_backend']
Expand Down
26 changes: 7 additions & 19 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,9 @@ def _update_env_var(self, section, name, new_value):
@staticmethod
def _create_future_warning(name, section, current_value, new_value, version):
warnings.warn(
'The {name} setting in [{section}] has the old default value '
'of {current_value!r}. This value has been changed to {new_value!r} in the '
'running config, but please update your config before Apache '
'Airflow {version}.'.format(
name=name, section=section, current_value=current_value, new_value=new_value, version=version
),
f'The {name!r} setting in [{section}] has the old default value of {current_value!r}. '
f'This value has been changed to {new_value!r} in the running config, but '
f'please update your config before Apache Airflow {version}.',
FutureWarning,
)

Expand Down Expand Up @@ -724,24 +721,15 @@ def load_test_config(self):
def _warn_deprecate(section, key, deprecated_section, deprecated_name):
if section == deprecated_section:
warnings.warn(
'The {old} option in [{section}] has been renamed to {new} - the old '
'setting has been used, but please update your config.'.format(
old=deprecated_name,
new=key,
section=section,
),
f'The {deprecated_name} option in [{section}] has been renamed to {key} - '
f'the old setting has been used, but please update your config.',
DeprecationWarning,
stacklevel=3,
)
else:
warnings.warn(
'The {old_key} option in [{old_section}] has been moved to the {new_key} option in '
'[{new_section}] - the old setting has been used, but please update your config.'.format(
old_section=deprecated_section,
old_key=deprecated_name,
new_key=key,
new_section=section,
),
f'The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option '
f'in [{section}] - the old setting has been used, but please update your config.',
DeprecationWarning,
stacklevel=3,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ def my_py_command(params, test_mode=None, task=None):
"""
if test_mode:
print(
" 'foo' was passed in via test={} command : kwargs[params][foo] \
= {}".format(
test_mode, task.params["foo"]
)
f" 'foo' was passed in via test={test_mode} command : kwargs[params][foo] = {task.params['foo']}"
)
# Print out the value of "miff", passed in below via the Python Operator
print(f" 'miff' was passed in via task params = {params['miff']}")
Expand Down Expand Up @@ -72,7 +69,7 @@ def print_env_vars(test_mode=None):
"""
echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
"""
"""
)

also_run_this = BashOperator(
Expand Down
19 changes: 10 additions & 9 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ def process_error(self, event: Any) -> str:
# Return resource version 0
return '0'
raise AirflowException(
'Kubernetes failure for %s with code %s and message: %s'
% (raw_object['reason'], raw_object['code'], raw_object['message'])
f"Kubernetes failure for {raw_object['reason']} with code {raw_object['code']} and message: "
f"{raw_object['message']}"
)

def process_status(
Expand Down Expand Up @@ -468,12 +468,12 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
continue

# Build the pod selector
dict_string = "dag_id={},task_id={},airflow-worker={}".format(
pod_generator.make_safe_label_value(task.dag_id),
pod_generator.make_safe_label_value(task.task_id),
pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
base_label_selector = (
f"dag_id={pod_generator.make_safe_label_value(task.dag_id)},"
f"task_id={pod_generator.make_safe_label_value(task.task_id)},"
f"airflow-worker={pod_generator.make_safe_label_value(str(self.scheduler_job_id))}"
)
kwargs = dict(label_selector=dict_string)
kwargs = dict(label_selector=base_label_selector)
if self.kube_config.kube_client_request_args:
kwargs.update(**self.kube_config.kube_client_request_args)

Expand All @@ -483,8 +483,9 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
if pod_list.items:
continue
# Fallback to old style of using execution_date
kwargs['label_selector'] = dict_string + ',execution_date={}'.format(
pod_generator.datetime_to_label_safe_datestring(task.execution_date)
kwargs['label_selector'] = (
f'{base_label_selector},'
f'execution_date={pod_generator.datetime_to_label_safe_datestring(task.execution_date)}'
)
pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
if pod_list.items:
Expand Down
13 changes: 5 additions & 8 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,8 @@ def _manage_executor_state(self, running):

if state in (State.FAILED, State.SUCCESS) and ti.state in self.STATES_COUNT_AS_RUNNING:
msg = (
"Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally? Info: {}".format(ti, state, ti.state, info)
f"Executor reports task instance {ti} finished ({state}) although the task says its "
f"{ti.state}. Was the task killed externally? Info: {info}"
)
self.log.error(msg)
ti.handle_failure_with_callback(error=msg)
Expand Down Expand Up @@ -579,8 +578,7 @@ def _per_task_process(key, ti: TaskInstance, session=None):
open_slots = pool.open_slots(session=session)
if open_slots <= 0:
raise NoAvailablePoolSlot(
"Not scheduling since there are "
"{} open slots in pool {}".format(open_slots, task.pool)
f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
)

num_running_task_instances_in_dag = DAG.get_num_task_instances(
Expand Down Expand Up @@ -778,9 +776,8 @@ def _execute(self, session=None):
tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past]
if tasks_that_depend_on_past:
raise AirflowException(
'You cannot backfill backwards because one or more tasks depend_on_past: {}'.format(
",".join(tasks_that_depend_on_past)
)
f'You cannot backfill backwards because one or more '
f'tasks depend_on_past: {",".join(tasks_that_depend_on_past)}'
)
dagrun_infos = dagrun_infos[::-1]

Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ def signal_handler(signum, frame):
Stats.incr('local_task_job_prolonged_heartbeat_failure', 1, 1)
self.log.error("Heartbeat time limit exceeded!")
raise AirflowException(
"Time since last heartbeat({:.2f}s) "
"exceeded limit ({}s).".format(time_since_last_heartbeat, heartbeat_time_limit)
f"Time since last heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit "
f"({heartbeat_time_limit}s)."
)
finally:
self.on_kill()
Expand Down
14 changes: 5 additions & 9 deletions airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,16 @@ def _get_handler(name):
# Check for pre 1.10 setting that might be in deployed airflow.cfg files
if task_log_reader == "file.task" and _get_handler("task"):
warnings.warn(
"task_log_reader setting in [logging] has a deprecated value of "
"{!r}, but no handler with this name was found. Please update "
"your config to use {!r}. Running config has been adjusted to "
"match".format(
task_log_reader,
"task",
),
f"task_log_reader setting in [logging] has a deprecated value of {task_log_reader!r}, "
"but no handler with this name was found. Please update your config to use task. "
"Running config has been adjusted to match",
DeprecationWarning,
)
conf.set('logging', 'task_log_reader', 'task')
else:
raise AirflowConfigException(
"Configured task_log_reader {!r} was not a handler of the 'airflow.task' "
"logger.".format(task_log_reader)
f"Configured task_log_reader {task_log_reader!r} was not a handler of "
f"the 'airflow.task' logger."
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,12 @@ def get_table_constraints(conn, table_name):
:return: a dictionary of ((constraint name, constraint type), column name) of table
:rtype: defaultdict(list)
"""
query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE tc.TABLE_NAME = '{table_name}' AND
(tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
""".format(
table_name=table_name
)
"""
result = conn.execute(query).fetchall()
constraint_dict = defaultdict(list)
for constraint, constraint_type, column in result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,12 @@ def get_table_constraints(conn, table_name):
:return: a dictionary of ((constraint name, constraint type), column name) of table
:rtype: defaultdict(list)
"""
query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE tc.TABLE_NAME = '{table_name}' AND
(tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
""".format(
table_name=table_name
)
"""
result = conn.execute(query).fetchall()
constraint_dict = defaultdict(lambda: defaultdict(list))
for constraint, constraint_type, col_name in result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,12 @@ def get_table_constraints(conn, table_name):
:return: a dictionary of ((constraint name, constraint type), column name) of table
:rtype: defaultdict(list)
"""
query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE tc.TABLE_NAME = '{table_name}' AND
(tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
""".format(
table_name=table_name
)
"""
result = conn.execute(query).fetchall()
constraint_dict = defaultdict(list)
for constraint, constraint_type, column in result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ def upgrade():
# DagParser it will get set to correct value.

op.execute(
"UPDATE dag SET concurrency={}, has_task_concurrency_limits={} where concurrency IS NULL".format(
concurrency, 1 if is_sqlite or is_mssql else sa.true()
)
f"""
UPDATE dag SET
concurrency={concurrency},
has_task_concurrency_limits={1 if is_sqlite or is_mssql else sa.true()}
where concurrency IS NULL
"""
)

with op.batch_alter_table('dag', schema=None) as batch_op:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ def get_table_constraints(conn, table_name):
:return: a dictionary of ((constraint name, constraint type), column name) of table
:rtype: defaultdict(list)
"""
query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
query = f"""SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE tc.TABLE_NAME = '{table_name}' AND
(tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
""".format(
table_name=table_name
)
"""
result = conn.execute(query).fetchall()
constraint_dict = defaultdict(list)
for constraint, constraint_type, column in result:
Expand Down

0 comments on commit 86a2a19

Please sign in to comment.