Skip to content

Commit

Permalink
Clean up f-strings in logging calls (#23597)
Browse files Browse the repository at this point in the history
  • Loading branch information
josh-fell committed May 23, 2022
1 parent 637a8b8 commit ec6761a
Show file tree
Hide file tree
Showing 22 changed files with 74 additions and 64 deletions.
2 changes: 1 addition & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def _deactivate_stale_dags(self, session=None):
dag.fileloc in last_parsed
and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]
):
self.log.info(f"DAG {dag.dag_id} is missing and will be deactivated.")
self.log.info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)

if to_deactivate:
Expand Down
5 changes: 3 additions & 2 deletions airflow/kubernetes/pod_launcher_deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ def parse_log_line(self, line: str) -> Tuple[Optional[str], str]:
split_at = line.find(' ')
if split_at == -1:
self.log.error(
f"Error parsing timestamp (no timestamp in message: '{line}'). "
"Will continue execution but won't update timestamp"
"Error parsing timestamp (no timestamp in message: %r). "
"Will continue execution but won't update timestamp",
line,
)
return None, line
timestamp = line[:split_at]
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ def get_uri(self) -> str:
"""Return connection in URI format"""
if '_' in self.conn_type:
self.log.warning(
f"Connection schemes (type: {str(self.conn_type)}) "
f"shall not contain '_' according to RFC3986."
"Connection schemes (type: %s) shall not contain '_' according to RFC3986.",
self.conn_type,
)

uri = f"{str(self.conn_type).lower().replace('_', '-')}://"
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,8 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: Lis
true_delay = first_start_date - data_interval_end
if true_delay.total_seconds() > 0:
Stats.timing(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
except Exception as e:
self.log.warning(f'Failed to record first_task_scheduling_delay metric:\n{e}')
except Exception:
self.log.warning('Failed to record first_task_scheduling_delay metric:', exc_info=True)

def _emit_duration_stats_for_finished_state(self):
if self.state == State.RUNNING:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def _start_task(self, context):

self.arn = response['tasks'][0]['taskArn']
self.ecs_task_id = self.arn.split("/")[-1]
self.log.info(f"ECS task ID is: {self.ecs_task_id}")
self.log.info("ECS task ID is: %s", self.ecs_task_id)

if self.reattach:
# Save the task ARN in XCom to be able to reattach it if needed
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/redshift_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ def wait_for_results(self, statement_id):
elif status == 'FAILED' or status == 'ABORTED':
raise ValueError(f"Statement {statement_id!r} terminated with status {status}.")
else:
self.log.info(f"Query {status}")
self.log.info("Query %s", status)
sleep(self.poll_interval)

def execute(self, context: 'Context') -> None:
"""Execute a statement against Amazon Redshift"""
self.log.info(f"Executing statement: {self.sql}")
self.log.info("Executing statement: %s", self.sql)

self.statement_id = self.execute_query()

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/redshift_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ def get_hook(self) -> RedshiftSQLHook:

def execute(self, context: 'Context') -> None:
"""Execute a statement against Amazon Redshift"""
self.log.info(f"Executing statement: {self.sql}")
self.log.info("Executing statement: %s", self.sql)
hook = self.get_hook()
hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,4 +693,4 @@ def __init__(self, *, config, aws_conn_id: str = DEFAULT_CONN_ID, **kwargs):
def execute(self, context: 'Context') -> Any:
sagemaker_hook = SageMakerHook(aws_conn_id=self.aws_conn_id)
sagemaker_hook.delete_model(model_name=self.config['ModelName'])
self.log.info(f"Model {self.config['ModelName']} deleted Successfully.")
self.log.info("Model %s deleted successfully.", self.config['ModelName'])
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/transfers/ftp_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key):
gzip=self.gzip,
acl_policy=self.acl_policy,
)
self.log.info(f'File upload to {s3_file_key}')
self.log.info('File upload to %s', s3_file_key)

def execute(self, context: 'Context'):
self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
self.s3_hook = S3Hook(self.aws_conn_id)

if self.ftp_filenames:
if isinstance(self.ftp_filenames, str):
self.log.info(f'Getting files in {self.ftp_path}')
self.log.info('Getting files in %s', self.ftp_path)

list_dir = self.ftp_hook.list_directory(
path=self.ftp_path,
Expand All @@ -129,7 +129,7 @@ def execute(self, context: 'Context'):
files = list(filter(lambda f: ftp_filename in f, list_dir))

for file in files:
self.log.info(f'Moving file {file}')
self.log.info('Moving file %s', file)

if self.s3_filenames and isinstance(self.s3_filenames, str):
filename = file.replace(self.ftp_filenames, self.s3_filenames)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ def execute(self, context: 'Context') -> str:
)

s3_uri = f"s3://{self.s3_bucket_name}/{self.s3_key}"
self.log.info(f"Salesforce data uploaded to S3 at {s3_uri}.")
self.log.info("Salesforce data uploaded to S3 at %s.", s3_uri)

return s3_uri
4 changes: 2 additions & 2 deletions airflow/providers/arangodb/sensors/arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def __init__(self, *, query: str, arangodb_conn_id: str = "arangodb_default", **
self.query = query

def poke(self, context: 'Context') -> bool:
self.log.info(f"Sensor running following query: {self.query}")
self.log.info("Sensor running the following query: %s", self.query)
hook = ArangoDBHook(self.arangodb_conn_id)
records = hook.query(self.query, count=True).count()
self.log.info(f"Total Records found: {records}")
self.log.info("Total records found: %d", records)
return 0 != records
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def create_custom_object(
)
self.log.warning("Deleted SparkApplication with the same name.")
except client.rest.ApiException:
self.log.info(f"SparkApp {body_dict['metadata']['name']} not found.")
self.log.info("SparkApp %s not found.", body_dict['metadata']['name'])

try:
response = api.create_namespaced_custom_object(
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,9 @@ def parse_log_line(self, line: str) -> Tuple[Optional[DateTime], str]:
split_at = line.find(' ')
if split_at == -1:
self.log.error(
f"Error parsing timestamp (no timestamp in message '${line}'). "
"Will continue execution but won't update timestamp"
"Error parsing timestamp (no timestamp in message %r). "
"Will continue execution but won't update timestamp",
line,
)
return None, line
timestamp = line[:split_at]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ def execute(self, context: 'Context'):
impersonation_chain=self.impersonation_chain,
)

self.log.info(f"Removing a DeployedModel {self.deployed_model_id}")
self.log.info("Removing a DeployedModel %s", self.deployed_model_id)
operation = hook.undeploy_model(
project_id=self.project_id,
region=self.region,
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/google/cloud/sensors/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def poke(self, context: "Context") -> bool:
job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id
)
except ServerError as err:
self.log.info(f"DURATION RUN: {self._duration()}")
if self._duration() > self.wait_timeout:
duration = self._duration()
self.log.info("DURATION RUN: %f", duration)
if duration > self.wait_timeout:
raise AirflowException(
f"Timeout: dataproc job {self.dataproc_job_id} "
f"is not ready after {self.wait_timeout}s"
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/google/suite/transfers/sql_to_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import datetime
import logging
import numbers
from contextlib import closing
from typing import Any, Iterable, Mapping, Optional, Sequence, Union
Expand Down Expand Up @@ -120,7 +121,9 @@ def execute(self, context: Any) -> None:
impersonation_chain=self.impersonation_chain,
)

self.log.info(f"Uploading data to https://docs.google.com/spreadsheets/d/{self.spreadsheet_id}")
if self.log.isEnabledFor(logging.INFO):
url = f"https://docs.google.com/spreadsheets/d/{self.spreadsheet_id}"
self.log.info("Uploading data to %s", url)

sheet_hook.update_values(
spreadsheet_id=self.spreadsheet_id,
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/hooks/data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,13 @@ def get_pipeline_run_status(
:param factory_name: The factory name.
:return: The status of the pipeline run.
"""
self.log.info(f"Getting the status of run ID {run_id}.")
self.log.info("Getting the status of run ID %s.", run_id)
pipeline_run_status = self.get_pipeline_run(
run_id=run_id,
factory_name=factory_name,
resource_group_name=resource_group_name,
).status
self.log.info(f"Current status of pipeline run {run_id}: {pipeline_run_status}")
self.log.info("Current status of pipeline run %s: %s", run_id, pipeline_run_status)

return pipeline_run_status

Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/microsoft/azure/operators/data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def __init__(

def execute(self, context: "Context") -> None:
self.hook = AzureDataFactoryHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
self.log.info(f"Executing the {self.pipeline_name} pipeline.")
self.log.info("Executing the %s pipeline.", self.pipeline_name)
response = self.hook.run_pipeline(
pipeline_name=self.pipeline_name,
resource_group_name=self.resource_group_name,
Expand All @@ -174,7 +174,7 @@ def execute(self, context: "Context") -> None:
context["ti"].xcom_push(key="run_id", value=self.run_id)

if self.wait_for_termination:
self.log.info(f"Waiting for pipeline run {self.run_id} to terminate.")
self.log.info("Waiting for pipeline run %s to terminate.", self.run_id)

if self.hook.wait_for_pipeline_run_status(
run_id=self.run_id,
Expand All @@ -184,7 +184,7 @@ def execute(self, context: "Context") -> None:
resource_group_name=self.resource_group_name,
factory_name=self.factory_name,
):
self.log.info(f"Pipeline run {self.run_id} has completed successfully.")
self.log.info("Pipeline run %s has completed successfully.", self.run_id)
else:
raise AzureDataFactoryPipelineRunException(
f"Pipeline run {self.run_id} has failed or has been cancelled."
Expand All @@ -207,6 +207,6 @@ def on_kill(self) -> None:
resource_group_name=self.resource_group_name,
factory_name=self.factory_name,
):
self.log.info(f"Pipeline run {self.run_id} has been cancelled successfully.")
self.log.info("Pipeline run %s has been cancelled successfully.", self.run_id)
else:
raise AzureDataFactoryPipelineRunException(f"Pipeline run {self.run_id} was not cancelled.")
9 changes: 6 additions & 3 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,12 @@ def _verify_all_providers_all_compatible(self):
if min_version:
if packaging_version.parse(min_version) > packaging_version.parse(info.version):
log.warning(
f"The package {provider_id} is not compatible with this version of Airflow. "
f"The package has version {info.version} but the minimum supported version "
f"of the package is {min_version}"
"The package %s is not compatible with this version of Airflow. "
"The package has version %s but the minimum supported version "
"of the package is %s",
provider_id,
info.version,
min_version,
)

@provider_info_cache("hooks")
Expand Down
2 changes: 1 addition & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def configure_orm(disable_connection_pool=False):
data = result.fetchone()[0]
if data != 1:
log.critical("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
log.critical(f"The database {engine.url.database} has it disabled.")
log.critical("The database %s has it disabled.", engine.url.database)
log.critical("This will cause random deadlocks, Refusing to start.")
log.critical(
"See https://airflow.apache.org/docs/apache-airflow/stable/howto/"
Expand Down

0 comments on commit ec6761a

Please sign in to comment.