Skip to content

Commit

Permalink
Switch to f-strings using flynt. (#13732)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcarp committed Jan 23, 2021
1 parent 9592be8 commit a9ac2b0
Show file tree
Hide file tree
Showing 133 changed files with 288 additions and 373 deletions.
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Expand Up @@ -601,4 +601,11 @@ repos:
entry: "./scripts/ci/pre_commit/pre_commit_in_container_bats_test.sh"
files: ^tests/bats/in_container/.*.bats$|^scripts/in_container/.*sh
pass_filenames: false
- id: flynt
name: Convert to f-strings with flynt
entry: flynt
language: python
language_version: python3
additional_dependencies: ['flynt']
files: \.py$
## ONLY ADD PRE-COMMITS HERE THAT REQUIRE CI IMAGE
2 changes: 1 addition & 1 deletion BREEZE.rst
Expand Up @@ -2168,7 +2168,7 @@ This is the current syntax for `./breeze <./breeze>`_:
check-executables-have-shebangs check-hooks-apply check-integrations
check-merge-conflict check-xml consistent-pylint daysago-import-check
debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer
fix-encoding-pragma flake8 forbid-tabs helm-lint identity
fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity
incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters
lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm
no-providers-in-core-examples no-relative-imports pre-commit-descriptions
Expand Down
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Expand Up @@ -102,6 +102,8 @@ require Breeze Docker images to be installed locally:
----------------------------------- ---------------------------------------------------------------- ------------
``flake8`` Runs flake8. *
----------------------------------- ---------------------------------------------------------------- ------------
``flynt`` Runs flynt.
----------------------------------- ---------------------------------------------------------------- ------------
``forbid-tabs`` Fails if tabs are used in the project.
----------------------------------- ---------------------------------------------------------------- ------------
``helm-lint`` Verifies if helm lint passes for the chart
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/get_code.py
Expand Up @@ -32,5 +32,5 @@ def get_code(dag_id: str) -> str:
try:
return DagCode.get_code_by_fileloc(dag.fileloc)
except (OSError, DagCodeNotFound) as exception:
error_message = "Error {} while reading Dag id {} Code".format(str(exception), dag_id)
error_message = f"Error {str(exception)} while reading Dag id {dag_id} Code"
raise AirflowException(error_message, exception)
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/pool.py
Expand Up @@ -29,7 +29,7 @@ def get_pool(name, session=None):

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
raise PoolNotFound("Pool '%s' doesn't exist" % name)
raise PoolNotFound(f"Pool '{name}' doesn't exist")

return pool

Expand All @@ -49,7 +49,7 @@ def create_pool(name, slots, description, session=None):
try:
slots = int(slots)
except ValueError:
raise AirflowBadRequest("Bad value for `slots`: %s" % slots)
raise AirflowBadRequest(f"Bad value for `slots`: {slots}")

# Get the length of the pool column
pool_name_length = Pool.pool.property.columns[0].type.length
Expand Down Expand Up @@ -81,7 +81,7 @@ def delete_pool(name, session=None):

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
raise PoolNotFound("Pool '%s' doesn't exist" % name)
raise PoolNotFound(f"Pool '{name}' doesn't exist")

session.delete(pool)
session.commit()
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/connection_endpoint.py
Expand Up @@ -125,4 +125,4 @@ def post_connection(session):
session.add(connection)
session.commit()
return connection_schema.dump(connection)
raise AlreadyExists(detail="Connection already exist. ID: %s" % conn_id)
raise AlreadyExists(detail=f"Connection already exist. ID: {conn_id}")
2 changes: 1 addition & 1 deletion airflow/cli/commands/dag_command.py
Expand Up @@ -166,7 +166,7 @@ def set_is_paused(is_paused, args):
is_paused=is_paused,
)

print("Dag: {}, paused: {}".format(args.dag_id, str(is_paused)))
print(f"Dag: {args.dag_id}, paused: {is_paused}")


def dag_show(args):
Expand Down
11 changes: 4 additions & 7 deletions airflow/cli/commands/task_command.py
Expand Up @@ -406,14 +406,11 @@ def task_render(args):
for attr in task.__class__.template_fields:
print(
textwrap.dedent(
"""\
f""" # ----------------------------------------------------------
# property: {attr}
# ----------------------------------------------------------
# property: {}
# ----------------------------------------------------------
{}
""".format(
attr, getattr(task, attr)
)
{getattr(task, attr)}
"""
)
)

Expand Down
8 changes: 4 additions & 4 deletions airflow/cli/commands/user_command.py
Expand Up @@ -98,7 +98,7 @@ def users_manage_role(args, remove=False):
appbuilder = cached_app().appbuilder # pylint: disable=no-member
user = appbuilder.sm.find_user(username=args.username) or appbuilder.sm.find_user(email=args.email)
if not user:
raise SystemExit('User "{}" does not exist'.format(args.username or args.email))
raise SystemExit(f'User "{args.username or args.email}" does not exist')

role = appbuilder.sm.find_role(args.role)
if not role:
Expand Down Expand Up @@ -144,7 +144,7 @@ def remove_underscores(s):

with open(args.export, 'w') as file:
file.write(json.dumps(users, sort_keys=True, indent=4))
print("{} users successfully exported to {}".format(len(users), file.name))
print(f"{len(users)} users successfully exported to {file.name}")


@cli_utils.action_logging
Expand Down Expand Up @@ -191,7 +191,7 @@ def _import_users(users_list): # pylint: disable=redefined-outer-name

existing_user = appbuilder.sm.find_user(email=user['email'])
if existing_user:
print("Found existing user with email '{}'".format(user['email']))
print(f"Found existing user with email '{user['email']}'")
existing_user.roles = roles
existing_user.first_name = user['firstname']
existing_user.last_name = user['lastname']
Expand All @@ -206,7 +206,7 @@ def _import_users(users_list): # pylint: disable=redefined-outer-name
appbuilder.sm.update_user(existing_user)
users_updated.append(user['email'])
else:
print("Creating new user with email '{}'".format(user['email']))
print(f"Creating new user with email '{user['email']}'")
appbuilder.sm.add_user(
username=user['username'],
first_name=user['firstname'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/variable_command.py
Expand Up @@ -92,7 +92,7 @@ def _import_helper(filepath):
try:
Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e: # pylint: disable=broad-except
print('Variable import failed: {}'.format(repr(e)))
print(f'Variable import failed: {repr(e)}')
fail_count += 1
else:
suc_count += 1
Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Expand Up @@ -530,7 +530,7 @@ def write(self, fp, space_around_delimiters=True):
# This is based on the configparser.RawConfigParser.write method code to add support for
# reading options from environment variables.
if space_around_delimiters:
delimiter = " {} ".format(self._delimiters[0])
delimiter = f" {self._delimiters[0]} "
else:
delimiter = self._delimiters[0]
if self._defaults:
Expand Down
Expand Up @@ -42,7 +42,7 @@ def my_py_command(test_mode, params):
)
)
# Print out the value of "miff", passed in below via the Python Operator
print(" 'miff' was passed in via task params = {}".format(params["miff"]))
print(f" 'miff' was passed in via task params = {params['miff']}")
return 1


Expand All @@ -53,8 +53,8 @@ def print_env_vars(test_mode):
--env-vars '{"foo":"bar"}'`
"""
if test_mode:
print("foo={}".format(os.environ.get('foo')))
print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE')))
print(f"foo={os.environ.get('foo')}")
print(f"AIRFLOW_TEST_MODE={os.environ.get('AIRFLOW_TEST_MODE')}")


with DAG(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Expand Up @@ -35,7 +35,7 @@ def run_this_func(**context):
:param context: The execution context
:type context: dict
"""
print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))
print(f"Remotely received value of {context['dag_run'].conf['message']} for key=message")


with DAG(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/subdags/subdag.py
Expand Up @@ -43,7 +43,7 @@ def subdag(parent_dag_name, child_dag_name, args):

for i in range(5):
DummyOperator(
task_id='{}-task-{}'.format(child_dag_name, i + 1),
task_id=f'{child_dag_name}-task-{i + 1}',
default_args=args,
dag=dag_subdag,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api_etl.py
Expand Up @@ -91,7 +91,7 @@ def load(total_order_value: float):
instead of saving it to end user review, just prints it out.
"""

print("Total order value is: %.2f" % total_order_value)
print(f"Total order value is: {total_order_value:.2f}")

# [END load]

Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/dbapi.py
Expand Up @@ -248,7 +248,7 @@ def _generate_insert_sql(table, values, target_fields, replace, **kwargs):
sql = "INSERT INTO "
else:
sql = "REPLACE INTO "
sql += "{} {} VALUES ({})".format(table, target_fields, ",".join(placeholders))
sql += f"{table} {target_fields} VALUES ({','.join(placeholders)})"
return sql

def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/refresh_config.py
Expand Up @@ -61,7 +61,7 @@ def _load_from_exec_plugin(self):
if 'token' not in status:
logging.error('exec: missing token field in plugin output')
return None
self.token = "Bearer %s" % status['token'] # pylint: disable=W0201
self.token = f"Bearer {status['token']}" # pylint: disable=W0201
ts_str = status.get('expirationTimestamp')
if ts_str:
self.api_key_expire_ts = _parse_timestamp(ts_str)
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/connection.py
Expand Up @@ -165,7 +165,7 @@ def _parse_from_uri(self, uri: str):

def get_uri(self) -> str:
"""Return connection in URI format"""
uri = '{}://'.format(str(self.conn_type).lower().replace('_', '-'))
uri = f"{str(self.conn_type).lower().replace('_', '-')}://"

authority_block = ''
if self.login is not None:
Expand All @@ -190,12 +190,12 @@ def get_uri(self) -> str:
host_block += f'@:{self.port}'

if self.schema:
host_block += '/{}'.format(quote(self.schema, safe=''))
host_block += f"/{quote(self.schema, safe='')}"

uri += host_block

if self.extra_dejson:
uri += '?{}'.format(urlencode(self.extra_dejson))
uri += f'?{urlencode(self.extra_dejson)}'

return uri

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Expand Up @@ -1538,7 +1538,7 @@ def pickle_info(self):
dttm = timezone.utcnow()
pickled = pickle.dumps(self)
d['pickle_len'] = len(pickled)
d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm)
d['pickling_duration'] = str(timezone.utcnow() - dttm)
except Exception as e:
self.log.debug(e)
d['is_picklable'] = False
Expand Down
4 changes: 1 addition & 3 deletions airflow/models/taskinstance.py
Expand Up @@ -1594,9 +1594,7 @@ def get_template_context(self, session=None) -> Context: # pylint: disable=too-
yesterday_ds_nodash = yesterday_ds.replace('-', '')
tomorrow_ds_nodash = tomorrow_ds.replace('-', '')

ti_key_str = "{dag_id}__{task_id}__{ds_nodash}".format(
dag_id=task.dag_id, task_id=task.task_id, ds_nodash=ds_nodash
)
ti_key_str = f"{task.dag_id}__{task.task_id}__{ds_nodash}"

if task.params:
params.update(task.params)
Expand Down
4 changes: 1 addition & 3 deletions airflow/models/xcom.py
Expand Up @@ -71,9 +71,7 @@ def init_on_load(self):
self.value = pickle.loads(self.value)

def __repr__(self):
return '<XCom "{key}" ({task_id} @ {execution_date})>'.format(
key=self.key, task_id=self.task_id, execution_date=self.execution_date
)
return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'

@classmethod
@provide_session
Expand Down
14 changes: 5 additions & 9 deletions airflow/operators/sql.py
Expand Up @@ -293,9 +293,7 @@ def __init__(
self.days_back = -abs(days_back)
self.conn_id = conn_id
sqlexp = ", ".join(self.metrics_sorted)
sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format(
sqlexp=sqlexp, table=table, date_filter_column=date_filter_column
)
sqlt = f"SELECT {sqlexp} FROM {table} WHERE {date_filter_column}="

self.sql1 = sqlt + "'{{ ds }}'"
self.sql2 = sqlt + "'{{ macros.ds_add(ds, " + str(self.days_back) + ") }}'"
Expand Down Expand Up @@ -360,9 +358,7 @@ def execute(self, context=None):
ratios[k],
self.metrics_thresholds[k],
)
raise AirflowException(
"The following tests have failed:\n {}".format(", ".join(sorted(failed_tests)))
)
raise AirflowException(f"The following tests have failed:\n {', '.join(sorted(failed_tests))}")

self.log.info("All tests have passed")

Expand Down Expand Up @@ -535,7 +531,7 @@ def execute(self, context: Dict):
self._hook = self._get_hook()

if self._hook is None:
raise AirflowException("Failed to establish connection to '%s'" % self.conn_id)
raise AirflowException(f"Failed to establish connection to '{self.conn_id}'")

if self.sql is None:
raise AirflowException("Expected 'sql' parameter is missing.")
Expand Down Expand Up @@ -584,14 +580,14 @@ def execute(self, context: Dict):
follow_branch = self.follow_task_ids_if_true
else:
raise AirflowException(
"Unexpected query return result '{}' type '{}'".format(query_result, type(query_result))
f"Unexpected query return result '{query_result}' type '{type(query_result)}'"
)

if follow_branch is None:
follow_branch = self.follow_task_ids_if_false
except ValueError:
raise AirflowException(
"Unexpected query return result '{}' type '{}'".format(query_result, type(query_result))
f"Unexpected query return result '{query_result}' type '{type(query_result)}'"
)

self.skip_all_except(context["ti"], follow_branch)
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/datasync.py
Expand Up @@ -57,7 +57,7 @@ def __init__(self, wait_interval_seconds: int = 5, *args, **kwargs) -> None:
self.tasks: list = []
# wait_interval_seconds = 0 is used during unit tests
if wait_interval_seconds < 0 or wait_interval_seconds > 15 * 60:
raise ValueError("Invalid wait_interval_seconds %s" % wait_interval_seconds)
raise ValueError(f"Invalid wait_interval_seconds {wait_interval_seconds}")
self.wait_interval_seconds = wait_interval_seconds

def create_location(self, location_uri: str, **create_location_kwargs) -> str:
Expand Down Expand Up @@ -314,4 +314,4 @@ def wait_for_task_execution(self, task_execution_arn: str, max_iterations: int =
return False
if iterations <= 0:
raise AirflowTaskTimeout("Max iterations exceeded!")
raise AirflowException("Unknown status: %s" % status) # Should never happen
raise AirflowException(f"Unknown status: {status}") # Should never happen
4 changes: 1 addition & 3 deletions airflow/providers/amazon/aws/hooks/dynamodb.py
Expand Up @@ -58,6 +58,4 @@ def write_batch_data(self, items: Iterable) -> bool:
batch.put_item(Item=item)
return True
except Exception as general_error:
raise AirflowException(
"Failed to insert items in dynamodb, error: {error}".format(error=str(general_error))
)
raise AirflowException(f"Failed to insert items in dynamodb, error: {str(general_error)}")
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/sagemaker.py
Expand Up @@ -126,7 +126,7 @@ def secondary_training_status_message(
for transition in transitions_to_print:
message = transition['StatusMessage']
time_str = timezone.convert_to_utc(job_description['LastModifiedTime']).strftime('%Y-%m-%d %H:%M:%S')
status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message))
status_strs.append(f"{time_str} {transition['Status']} - {message}")

return '\n'.join(status_strs)

Expand Down Expand Up @@ -740,7 +740,7 @@ def check_status(
if status in non_terminal_states:
running = True
elif status in self.failed_states:
raise AirflowException('SageMaker job failed because %s' % response['FailureReason'])
raise AirflowException(f"SageMaker job failed because {response['FailureReason']}")
else:
running = False

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/log/s3_task_handler.py
Expand Up @@ -116,7 +116,7 @@ def _read(self, ti, try_number, metadata=None):
log_exists = self.s3_log_exists(remote_loc)
except Exception as error: # pylint: disable=broad-except
self.log.exception(error)
log = '*** Failed to verify remote log exists {}.\n{}\n'.format(remote_loc, str(error))
log = f'*** Failed to verify remote log exists {remote_loc}.\n{str(error)}\n'

if log_exists:
# If S3 remote file exists, we do not fetch logs from task instance
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/datasync.py
Expand Up @@ -351,7 +351,7 @@ def _execute_datasync_task(self) -> None:
self.log.log(level, '%s=%s', k, v)

if not result:
raise AirflowException("Failed TaskExecutionArn %s" % self.task_execution_arn)
raise AirflowException(f"Failed TaskExecutionArn {self.task_execution_arn}")

def on_kill(self) -> None:
"""Cancel the submitted DataSync task."""
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/emr_add_steps.py
Expand Up @@ -100,7 +100,7 @@ def execute(self, context: Dict[str, Any]) -> List[str]:
response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)

if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('Adding steps failed: %s' % response)
raise AirflowException(f'Adding steps failed: {response}')
else:
self.log.info('Steps %s added to JobFlow', response['StepIds'])
return response['StepIds']
Expand Up @@ -78,7 +78,7 @@ def execute(self, context: Dict[str, Any]) -> str:
response = emr.create_job_flow(job_flow_overrides)

if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow creation failed: %s' % response)
raise AirflowException(f'JobFlow creation failed: {response}')
else:
self.log.info('JobFlow with id %s created', response['JobFlowId'])
return response['JobFlowId']

0 comments on commit a9ac2b0

Please sign in to comment.