Skip to content

Commit

Permalink
Bump pyupgrade v2.13.0 to v2.18.1 (#15991)
Browse files Browse the repository at this point in the history
* Bump pyupgrade v2.13.0 to v2.18.1

* fixup! Bump pyupgrade v2.13.0 to v2.18.1
  • Loading branch information
mik-laj committed May 22, 2021
1 parent ee470e1 commit 476d0f6
Show file tree
Hide file tree
Showing 46 changed files with 76 additions and 81 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ repos:
files: ^chart/values\.schema\.json$|^chart/values_schema\.schema\.json$
pass_filenames: true
- repo: https://github.com/asottile/pyupgrade
rev: v2.13.0
rev: v2.18.1
hooks:
- id: pyupgrade
args: ["--py36-plus"]
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/role_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def roles_list(args):
appbuilder = cached_app().appbuilder # pylint: disable=no-member
roles = appbuilder.sm.get_all_roles()
AirflowConsole().print_as(
data=sorted([r.name for r in roles]), output=args.output, mapper=lambda x: {"name": x}
data=sorted(r.name for r in roles), output=args.output, mapper=lambda x: {"name": x}
)


Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def task_list(args, dag=None):
if args.tree:
dag.tree_view()
else:
tasks = sorted([t.task_id for t in dag.tasks])
tasks = sorted(t.task_id for t in dag.tasks)
print("\n".join(tasks))


Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def run_command(command):
process = subprocess.Popen(
shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
)
output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore') for stream in process.communicate()]
output, stderr = (stream.decode(sys.getdefaultencoding(), 'ignore') for stream in process.communicate())

if process.returncode != 0:
raise AirflowConfigException(
Expand Down
8 changes: 3 additions & 5 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ def get_unique_task_id(
return task_id
core = re.split(r'__\d+$', task_id)[0]
suffixes = sorted(
[
int(re.split(r'^.+__', task_id)[1])
for task_id in dag.task_ids
if re.match(rf'^{core}__\d+$', task_id)
]
int(re.split(r'^.+__', task_id)[1])
for task_id in dag.task_ids
if re.match(rf'^{core}__\d+$', task_id)
)
if not suffixes:
return f'{core}__1'
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def order_queued_tasks_by_priority(self) -> List[Tuple[TaskInstanceKey, QueuedTa
:return: List of tuples from the queued_tasks according to the priority.
"""
return sorted(
[(k, v) for k, v in self.queued_tasks.items()], # pylint: disable=unnecessary-comprehension
self.queued_tasks.items(),
key=lambda x: x[1][1],
reverse=True,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def _check_for_stalled_adopted_tasks(self):
"Adopted tasks were still pending after %s, assuming they never made it to celery and "
"clearing:\n\t%s",
self.task_adoption_timeout,
"\n\t".join([repr(x) for x in timedout_keys]),
"\n\t".join(repr(x) for x in timedout_keys),
)
for key in timedout_keys:
self.event_buffer[key] = (State.FAILED, None)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def trigger_tasks(self, open_slots: int) -> None:
:param open_slots: Number of open slots
"""
sorted_queue = sorted(
[(k, v) for k, v in self.queued_tasks.items()], # pylint: disable=unnecessary-comprehension
self.queued_tasks.items(),
key=lambda x: x[1][1],
reverse=True,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ def query(result, items):

reset_tis = helpers.reduce_in_chunks(query, tis_to_reset, [], self.max_tis_per_query)

task_instance_str = '\n\t'.join([repr(x) for x in reset_tis])
task_instance_str = '\n\t'.join(repr(x) for x in reset_tis)
session.commit()

self.log.info("Reset the following %s TaskInstances:\n\t%s", len(reset_tis), task_instance_str)
Expand Down
8 changes: 4 additions & 4 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
session.delete(ti)
session.commit()

task_list = "\n".join([sla.task_id + ' on ' + sla.execution_date.isoformat() for sla in slas])
task_list = "\n".join(sla.task_id + ' on ' + sla.execution_date.isoformat() for sla in slas)
blocking_task_list = "\n".join(
[ti.task_id + ' on ' + ti.execution_date.isoformat() for ti in blocking_tis]
ti.task_id + ' on ' + ti.execution_date.isoformat() for ti in blocking_tis
)
# Track whether email or any alert notification sent
# We consider email or the alert callback as notifications
Expand Down Expand Up @@ -943,7 +943,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
return executable_tis

# Put one task instance on each line
task_instance_str = "\n\t".join([repr(x) for x in task_instances_to_examine])
task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)

pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
Expand Down Expand Up @@ -1065,7 +1065,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
Stats.gauge('scheduler.tasks.executable', len(executable_tis))

task_instance_str = "\n\t".join([repr(x) for x in executable_tis])
task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str)

# set TIs to queued state
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ def get_run_dates(self, start_date, end_date=None):
using_end_date = end_date

# dates for dag runs
using_start_date = using_start_date or min([t.start_date for t in self.tasks])
using_start_date = using_start_date or min(t.start_date for t in self.tasks)
using_end_date = using_end_date or timezone.utcnow()

# next run date for a subdag isn't relevant (schedule_interval for subdags
Expand Down Expand Up @@ -1329,7 +1329,7 @@ def clear(
if count == 0:
return 0
if confirm_prompt:
ti_list = "\n".join([str(t) for t in tis])
ti_list = "\n".join(str(t) for t in tis)
question = (
"You are about to delete these {count} tasks:\n{ti_list}\n\nAre you sure? (yes/no): "
).format(count=count, ti_list=ti_list)
Expand Down Expand Up @@ -1395,7 +1395,7 @@ def clear_dags(
print("Nothing to clear.")
return 0
if confirm_prompt:
ti_list = "\n".join([str(t) for t in all_tis])
ti_list = "\n".join(str(t) for t in all_tis)
question = f"You are about to delete these {count} tasks:\n{ti_list}\n\nAre you sure? (yes/no): "
do_it = utils.helpers.ask_yesno(question)

Expand Down
8 changes: 4 additions & 4 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def collect_dags(
file=filepath.replace(settings.DAGS_FOLDER, ''),
duration=file_parse_end_dttm - file_parse_start_dttm,
dag_num=len(found_dags),
task_num=sum([len(dag.tasks) for dag in found_dags]),
task_num=sum(len(dag.tasks) for dag in found_dags),
dags=str([dag.dag_id for dag in found_dags]),
)
)
Expand Down Expand Up @@ -540,9 +540,9 @@ def dagbag_report(self):
"""Prints a report around DagBag loading stats"""
stats = self.dagbag_stats
dag_folder = self.dag_folder
duration = sum([o.duration for o in stats], timedelta()).total_seconds()
dag_num = sum([o.dag_num for o in stats])
task_num = sum([o.task_num for o in stats])
duration = sum((o.duration for o in stats), timedelta()).total_seconds()
dag_num = sum(o.dag_num for o in stats)
task_num = sum(o.task_num for o in stats)
table = tabulate(stats, headers="keys")

report = textwrap.dedent(
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ def signal_handler(signum, frame): # pylint: disable=unused-argument
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.info(
"Exporting the following env vars:\n%s",
'\n'.join([f"{k}={v}" for k, v in airflow_context_vars.items()]),
'\n'.join(f"{k}={v}" for k, v in airflow_context_vars.items()),
)

os.environ.update(airflow_context_vars)
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def get_env(self, context):
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.debug(
'Exporting the following env vars:\n%s',
'\n'.join([f"{k}={v}" for k, v in airflow_context_vars.items()]),
'\n'.join(f"{k}={v}" for k, v in airflow_context_vars.items()),
)
env.update(airflow_context_vars)
return env
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def push(self, meta_data):
Optional: Send data check info and metadata to an external database.
Default functionality will log metadata.
"""
info = "\n".join([f"""{key}: {item}""" for key, item in meta_data.items()])
info = "\n".join(f"""{key}: {item}""" for key, item in meta_data.items())
self.log.info("Log from %s:\n%s", self.dag_id, info)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def get_cloudwatch_logs(self, stream_name: str) -> str:
)
)

return '\n'.join([self._event_to_str(event) for event in events])
return '\n'.join(self._event_to_str(event) for event in events)
except Exception: # pylint: disable=broad-except
msg = 'Could not read remote logs from log_group: {} log_stream: {}.'.format(
self.log_group, stream_name
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/druid/transfers/hive_to_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def execute(self, context: Dict[str, Any]) -> None:
self.log.info("Extracting data from Hive")
hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
sql = self.sql.strip().strip(';')
tblproperties = ''.join([f", '{k}' = '{v}'" for k, v in self.hive_tblproperties.items()])
tblproperties = ''.join(f", '{k}' = '{v}'" for k, v in self.hive_tblproperties.items())
hql = f"""\
SET mapred.output.compress=false;
SET hive.exec.compress.output=false;
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/apache/hive/hooks/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,16 @@ def load_file(
if create or recreate:
if field_dict is None:
raise ValueError("Must provide a field dict when creating a table")
fields = ",\n ".join([f"`{k.strip('`')}` {v}" for k, v in field_dict.items()])
fields = ",\n ".join(f"`{k.strip('`')}` {v}" for k, v in field_dict.items())
hql += f"CREATE TABLE IF NOT EXISTS {table} (\n{fields})\n"
if partition:
pfields = ",\n ".join([p + " STRING" for p in partition])
pfields = ",\n ".join(p + " STRING" for p in partition)
hql += f"PARTITIONED BY ({pfields})\n"
hql += "ROW FORMAT DELIMITED\n"
hql += f"FIELDS TERMINATED BY '{delimiter}'\n"
hql += "STORED AS textfile\n"
if tblproperties is not None:
tprops = ", ".join([f"'{k}'='{v}'" for k, v in tblproperties.items()])
tprops = ", ".join(f"'{k}'='{v}'" for k, v in tblproperties.items())
hql += f"TBLPROPERTIES({tprops})\n"
hql += ";"
self.log.info(hql)
Expand All @@ -448,7 +448,7 @@ def load_file(
hql += "OVERWRITE "
hql += f"INTO TABLE {table} "
if partition:
pvals = ", ".join([f"{k}='{v}'" for k, v in partition.items()])
pvals = ", ".join(f"{k}='{v}'" for k, v in partition.items())
hql += f"PARTITION ({pvals})"

# As a workaround for HIVE-10541, add a newline character
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/hive/operators/hive_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def execute(self, context: Optional[Dict[str, Any]] = None) -> None:
exprs.update(assign_exprs)
exprs.update(self.extra_exprs)
exprs = OrderedDict(exprs)
exprs_str = ",\n ".join([v + " AS " + k[0] + '__' + k[1] for k, v in exprs.items()])
exprs_str = ",\n ".join(v + " AS " + k[0] + '__' + k[1] for k, v in exprs.items())

where_clause_ = [f"{k} = '{v}'" for k, v in self.partition.items()]
where_clause = " AND\n ".join(where_clause_)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/backcompat/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ def to_k8s_client_obj(self) -> k8s.V1Volume:
# source: https://www.geeksforgeeks.org/python-program-to-convert-camel-case-string-to-snake-case/
@staticmethod
def _convert_to_snake_case(input_string):
return ''.join(['_' + i.lower() if i.isupper() else i for i in input_string]).lstrip('_')
return ''.join('_' + i.lower() if i.isupper() else i for i in input_string).lstrip('_')
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def handle_pod_overlap(
@staticmethod
def _get_pod_identifying_label_string(labels) -> str:
filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())])
return ','.join(label_id + '=' + label for label_id, label in sorted(filtered_labels.items()))

@staticmethod
def _try_numbers_match(context, pod) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def _read(
# to prevent it from showing in the UI.
def concat_logs(lines):
log_range = (len(lines) - 1) if lines[-1].message == self.end_of_log_mark.strip() else len(lines)
return '\n'.join([self._format_msg(lines[i]) for i in range(log_range)])
return '\n'.join(self._format_msg(lines[i]) for i in range(log_range))

message = [(host, concat_logs(hosted_log)) for host, hosted_log in logs_by_host]

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ def start_sql_job(
f"--region={location}",
*(beam_options_to_args(options)),
]
self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd]))
self.log.info("Executing command: %s", " ".join(shlex.quote(c) for c in cmd))
with self.provide_authorized_gcloud():
proc = subprocess.run( # pylint: disable=subprocess-run-check
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/gdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,5 @@ def delete_deployment(
resp = request.execute()
if 'error' in resp.keys():
raise AirflowException(
'Errors deleting deployment: ', ', '.join([err['message'] for err in resp['error']['errors']])
'Errors deleting deployment: ', ', '.join(err['message'] for err in resp['error']['errors'])
)
2 changes: 1 addition & 1 deletion airflow/providers/microsoft/azure/hooks/azure_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def wait_for_all_node_state(self, pool_id: str, node_state: Set) -> list:
# refresh pool to ensure that there is no resize error
pool = self.connection.pool.get(pool_id)
if pool.resize_errors is not None:
resize_errors = "\n".join([repr(e) for e in pool.resize_errors])
resize_errors = "\n".join(repr(e) for e in pool.resize_errors)
raise RuntimeError(f'resize error encountered for pool {pool.id}:\n{resize_errors}')
nodes = list(self.connection.compute_node.list(pool.id))
if len(nodes) >= pool.target_dedicated_nodes and all(node.state in node_state for node in nodes):
Expand Down
2 changes: 1 addition & 1 deletion airflow/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class AllowListValidator:
def __init__(self, allow_list=None):
if allow_list:
# pylint: disable=consider-using-generator
self.allow_list = tuple([item.strip().lower() for item in allow_list.split(',')])
self.allow_list = tuple(item.strip().lower() for item in allow_list.split(','))
else:
self.allow_list = None

Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def execute_in_subprocess(cmd: List[str]):
:param cmd: command and arguments to run
:type cmd: List[str]
"""
log.info("Executing cmd: %s", " ".join([shlex.quote(c) for c in cmd]))
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
with subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True
) as proc:
Expand All @@ -153,7 +153,7 @@ def execute_interactive(cmd: List[str], **kwargs):
state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
the process is completed.
"""
log.info("Executing cmd: %s", " ".join([shlex.quote(c) for c in cmd]))
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))

old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

def get_random_string(length=8, choices=string.ascii_letters + string.digits):
"""Generate random string"""
return ''.join([choice(choices) for _ in range(length)])
return ''.join(choice(choices) for _ in range(length))


def to_boolean(astring):
Expand Down
8 changes: 3 additions & 5 deletions airflow/utils/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,9 @@ def __init__(
raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
base = re.split(r'__\d+$', group_id)[0]
suffixes = sorted(
[
int(re.split(r'^.+__', used_group_id)[1])
for used_group_id in self.used_group_ids
if used_group_id is not None and re.match(rf'^{base}__\d+$', used_group_id)
]
int(re.split(r'^.+__', used_group_id)[1])
for used_group_id in self.used_group_ids
if used_group_id is not None and re.match(rf'^{base}__\d+$', used_group_id)
)
if not suffixes:
self._group_id += '__1'
Expand Down

0 comments on commit 476d0f6

Please sign in to comment.