Skip to content

Commit

Permalink
[AIRFLOW-6515] Change Log Levels from Info/Warn to Error (#8170)
Browse files Browse the repository at this point in the history
  • Loading branch information
wkhudgins92 committed Apr 9, 2020
1 parent 6aa4f53 commit 87969a3
Show file tree
Hide file tree
Showing 28 changed files with 52 additions and 52 deletions.
4 changes: 2 additions & 2 deletions airflow/contrib/utils/sendgrid.py
Expand Up @@ -122,5 +122,5 @@ def _post_sendgrid_mail(mail_data):
log.info('Email with subject %s is successfully sent to recipients: %s',
mail_data['subject'], mail_data['personalizations'])
else:
log.warning('Failed to send out email with subject %s, status code: %s',
mail_data['subject'], response.status_code)
log.error('Failed to send out email with subject %s, status code: %s',
mail_data['subject'], response.status_code)
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Expand Up @@ -75,7 +75,7 @@ def queue_command(self,
self.log.info("Adding to queue: %s", command)
self.queued_tasks[simple_task_instance.key] = (command, priority, queue, simple_task_instance)
else:
self.log.info("could not queue task %s", simple_task_instance.key)
self.log.error("could not queue task %s", simple_task_instance.key)

def queue_task_instance(
self,
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/kubernetes_executor.py
Expand Up @@ -391,7 +391,7 @@ def process_status(self, pod_id: str,
else:
self.log.info('Event: %s Pending', pod_id)
elif status == 'Failed':
self.log.info('Event: %s Failed', pod_id)
self.log.error('Event: %s Failed', pod_id)
self.watcher_queue.put((pod_id, namespace, State.FAILED, labels, resource_version))
elif status == 'Succeeded':
self.log.info('Event: %s Succeeded', pod_id)
Expand Down
4 changes: 2 additions & 2 deletions airflow/kubernetes/pod_launcher.py
Expand Up @@ -250,13 +250,13 @@ def process_status(self, job_id, status):
if status == PodStatus.PENDING:
return State.QUEUED
elif status == PodStatus.FAILED:
self.log.info('Event with job id %s Failed', job_id)
self.log.error('Event with job id %s Failed', job_id)
return State.FAILED
elif status == PodStatus.SUCCEEDED:
self.log.info('Event with job id %s Succeeded', job_id)
return State.SUCCESS
elif status == PodStatus.RUNNING:
return State.RUNNING
else:
self.log.info('Event: Invalid state %s on job %s', status, job_id)
self.log.error('Event: Invalid state %s on job %s', status, job_id)
return State.FAILED
2 changes: 1 addition & 1 deletion airflow/logging_config.py
Expand Up @@ -61,7 +61,7 @@ def configure_logging():
# Try to init logging
dictConfig(logging_config)
except ValueError as e:
log.warning('Unable to load the config, contains a configuration error.')
log.error('Unable to load the config, contains a configuration error.')
# When there is an error in the config, escalate the exception
# otherwise Airflow would silently fall back on the default config
raise e
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagrun.py
Expand Up @@ -323,7 +323,7 @@ def update_state(self, session=None):
if not unfinished_tasks and any(
leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for leaf_ti in leaf_tis
):
self.log.info('Marking run %s failed', self)
self.log.error('Marking run %s failed', self)
self.set_state(State.FAILED)
dag.handle_callback(self, success=False, reason='task_failure',
session=session)
Expand All @@ -339,7 +339,7 @@ def update_state(self, session=None):
# if *all tasks* are deadlocked, the run failed
elif (unfinished_tasks and none_depends_on_past and
none_task_concurrency and not are_runnable_tasks):
self.log.info('Deadlock; marking run %s failed', self)
self.log.error('Deadlock; marking run %s failed', self)
self.set_state(State.FAILED)
dag.handle_callback(self, success=False, reason='all_tasks_deadlocked',
session=session)
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/s3.py
Expand Up @@ -139,7 +139,7 @@ def check_for_bucket(self, bucket_name=None):
self.get_conn().head_bucket(Bucket=bucket_name)
return True
except ClientError as e:
self.log.info(e.response["Error"]["Message"])
self.log.error(e.response["Error"]["Message"])
return False

@provide_bucket_name
Expand Down Expand Up @@ -297,7 +297,7 @@ def check_for_key(self, key, bucket_name=None):
self.get_conn().head_object(Bucket=bucket_name, Key=key)
return True
except ClientError as e:
self.log.info(e.response["Error"]["Message"])
self.log.error(e.response["Error"]["Message"])
return False

@provide_bucket_name
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/batch.py
Expand Up @@ -175,7 +175,7 @@ def submit_job(self, context: Dict): # pylint: disable=unused-argument
self.log.info("AWS Batch job (%s) started: %s", self.job_id, response)

except Exception as e:
self.log.info("AWS Batch job (%s) failed submission", self.job_id)
self.log.error("AWS Batch job (%s) failed submission", self.job_id)
raise AirflowException(e)

def monitor_job(self, context: Dict): # pylint: disable=unused-argument
Expand All @@ -194,5 +194,5 @@ def monitor_job(self, context: Dict): # pylint: disable=unused-argument
self.log.info("AWS Batch job (%s) succeeded", self.job_id)

except Exception as e:
self.log.info("AWS Batch job (%s) failed monitoring", self.job_id)
self.log.error("AWS Batch job (%s) failed monitoring", self.job_id)
raise AirflowException(e)
6 changes: 3 additions & 3 deletions airflow/providers/apache/hdfs/hooks/webhdfs.py
Expand Up @@ -81,11 +81,11 @@ def _find_valid_server(self):
host_socket.close()
return client
else:
self.log.info("Could not connect to %s:%s", connection.host, connection.port)
self.log.error("Could not connect to %s:%s", connection.host, connection.port)
host_socket.close()
except HdfsError as hdfs_error:
self.log.info('Read operation on namenode %s failed with error: %s',
connection.host, hdfs_error)
self.log.error('Read operation on namenode %s failed with error: %s',
connection.host, hdfs_error)
return None

def _get_client(self, connection):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/hive/hooks/hive.py
Expand Up @@ -555,7 +555,7 @@ def _find_valid_server(self):
host_socket.close()
return conn
else:
self.log.info("Could not connect to %s:%s", conn.host, conn.port)
self.log.error("Could not connect to %s:%s", conn.host, conn.port)
return None

def get_conn(self):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/spark/hooks/spark_submit.py
Expand Up @@ -643,5 +643,5 @@ def on_kill(self):
self.log.info("Spark on K8s killed with response: %s", api_response)

except kube_client.ApiException as e:
self.log.info("Exception when attempting to kill Spark on K8s:")
self.log.error("Exception when attempting to kill Spark on K8s:")
self.log.exception(e)
2 changes: 1 addition & 1 deletion airflow/providers/ftp/sensors/ftp.py
Expand Up @@ -76,7 +76,7 @@ def poke(self, context):
try:
hook.get_mod_time(self.path)
except ftplib.error_perm as e:
self.log.info('Ftp error encountered: %s', str(e))
self.log.error('Ftp error encountered: %s', str(e))
error_code = self._get_error_code(e)
if ((error_code != 550) and
(self.fail_on_transient_errors or
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Expand Up @@ -796,7 +796,7 @@ def insert_all(self, project_id: str, dataset_id: str, table_id: str,
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(error_msg)
)
self.log.info(error_msg)
self.log.error(error_msg)

def update_dataset(self, dataset_id: str,
dataset_resource: Dict, project_id: Optional[str] = None) -> Dict:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/hooks/bigtable.py
Expand Up @@ -94,8 +94,8 @@ def delete_instance(self, instance_id: str, project_id: Optional[str] = None) ->
if instance:
instance.delete()
else:
self.log.info("The instance '%s' does not exist in project '%s'. Exiting", instance_id,
project_id)
self.log.warning("The instance '%s' does not exist in project '%s'. Exiting",
instance_id, project_id)

@GoogleBaseHook.fallback_to_default_project_id
def create_instance(
Expand Down
10 changes: 5 additions & 5 deletions airflow/providers/google/cloud/operators/dlp.py
Expand Up @@ -663,7 +663,7 @@ def execute(self, context):
metadata=self.metadata,
)
except NotFound:
self.log.info("Template %s not found.", self.template_id)
self.log.error("Template %s not found.", self.template_id)


class CloudDLPDeleteDLPJobOperator(BaseOperator):
Expand Down Expand Up @@ -723,7 +723,7 @@ def execute(self, context):
metadata=self.metadata,
)
except NotFound:
self.log.info("Job %s id not found.", self.dlp_job_id)
self.log.error("Job %s id not found.", self.dlp_job_id)


class CloudDLPDeleteInspectTemplateOperator(BaseOperator):
Expand Down Expand Up @@ -788,7 +788,7 @@ def execute(self, context):
metadata=self.metadata,
)
except NotFound:
self.log.info("Template %s not found", self.template_id)
self.log.error("Template %s not found", self.template_id)


class CloudDLPDeleteJobTriggerOperator(BaseOperator):
Expand Down Expand Up @@ -847,7 +847,7 @@ def execute(self, context):
metadata=self.metadata,
)
except NotFound:
self.log.info("Trigger %s not found", self.job_trigger_id)
self.log.error("Trigger %s not found", self.job_trigger_id)


class CloudDLPDeleteStoredInfoTypeOperator(BaseOperator):
Expand Down Expand Up @@ -917,7 +917,7 @@ def execute(self, context):
metadata=self.metadata,
)
except NotFound:
self.log.info("Stored info %s not found", self.stored_info_type_id)
self.log.error("Stored info %s not found", self.stored_info_type_id)


class CloudDLPGetDeidentifyTemplateOperator(BaseOperator):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/sensors/gcs.py
Expand Up @@ -308,7 +308,7 @@ def is_bucket_updated(self, current_num_objects: int) -> bool:
""", current_num_objects, path, self.inactivity_period)
return True

self.log.warning("FAILURE: Inactivity Period passed, not enough objects found in %s", path)
self.log.error("FAILURE: Inactivity Period passed, not enough objects found in %s", path)

return False
return False
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/hashicorp/secrets/vault.py
Expand Up @@ -189,7 +189,7 @@ def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[dict]:
response = self.client.secrets.kv.v2.read_secret_version(
path=secret_path, mount_point=self.mount_point)
except InvalidPath:
self.log.info("Secret %s not found in Path: %s", secret_id, secret_path)
self.log.debug("Secret %s not found in Path: %s", secret_id, secret_path)
return None

return_data = response["data"] if self.kv_engine_version == 1 else response["data"]["data"]
Expand Down
Expand Up @@ -296,11 +296,11 @@ def _monitor_logging(self, resource_group, name):
"container instance, retrying...")

if state == "Terminated":
self.log.info("Container exited with detail_status %s", detail_status)
self.log.error("Container exited with detail_status %s", detail_status)
return exit_code

if state == "Failed":
self.log.info("Azure provision failure")
self.log.error("Azure provision failure")
return 1

except AirflowTaskTimeout:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/qubole/hooks/qubole_check.py
Expand Up @@ -130,5 +130,5 @@ def get_query_results(self):
query_result_buffer.close()
return query_result
else:
self.log.info("Qubole command not found")
self.log.error("Qubole command not found")
return None
2 changes: 1 addition & 1 deletion airflow/providers/salesforce/hooks/salesforce.py
Expand Up @@ -173,7 +173,7 @@ def _to_timestamp(cls, column):
try:
column = pd.to_datetime(column)
except ValueError:
log.warning("Could not convert field to timestamps: %s", column.name)
log.error("Could not convert field to timestamps: %s", column.name)
return column

# now convert the newly created datetimes into timestamps
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Expand Up @@ -205,7 +205,7 @@ def _serialize(cls, var: Any) -> Any: # Unfortunately there is no support for r
log.debug('Cast type %s to str in serialization.', type(var))
return str(var)
except Exception: # pylint: disable=broad-except
log.warning('Failed to stringify.', exc_info=True)
log.error('Failed to stringify.', exc_info=True)
return FAILED
# pylint: enable=too-many-return-statements

Expand Down
4 changes: 2 additions & 2 deletions airflow/stats.py
Expand Up @@ -102,7 +102,7 @@ def wrapper(_self, stat, *args, **kwargs):
stat_name = handle_stat_name_func(stat)
return fn(_self, stat_name, *args, **kwargs)
except InvalidStatsNameException:
log.warning('Invalid stat name: %s.', stat, exc_info=True)
log.error('Invalid stat name: %s.', stat, exc_info=True)
return

return wrapper
Expand Down Expand Up @@ -199,7 +199,7 @@ def __init__(self, *args, **kwargs):
else:
self.__class__.instance = DummyStatsLogger()
except (socket.gaierror, ImportError) as e:
log.warning("Could not configure StatsClient: %s, using DummyStatsLogger instead.", e)
log.error("Could not configure StatsClient: %s, using DummyStatsLogger instead.", e)
self.__class__.instance = DummyStatsLogger()

def get_statsd_logger(self):
Expand Down
8 changes: 4 additions & 4 deletions airflow/task/task_runner/cgroup_task_runner.py
Expand Up @@ -186,10 +186,10 @@ def return_code(self):
# I wasn't able to track down the root cause of the package install failures, but
# we might want to revisit that approach at some other point.
if return_code == 137:
self.log.warning("Task failed with return code of 137. This may indicate "
"that it was killed due to excessive memory usage. "
"Please consider optimizing your task or using the "
"resources argument to reserve more memory for your task")
self.log.error("Task failed with return code of 137. This may indicate "
"that it was killed due to excessive memory usage. "
"Please consider optimizing your task or using the "
"resources argument to reserve more memory for your task")
return return_code

def terminate(self):
Expand Down
8 changes: 4 additions & 4 deletions airflow/www/api/experimental/endpoints.py
Expand Up @@ -71,7 +71,7 @@ def trigger_dag(dag_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'
.format(execution_date))
log.info(error_message)
log.error(error_message)
response = jsonify({'error': error_message})
response.status_code = 400

Expand Down Expand Up @@ -229,7 +229,7 @@ def task_instance_info(dag_id, execution_date, task_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'
.format(execution_date))
log.info(error_message)
log.error(error_message)
response = jsonify({'error': error_message})
response.status_code = 400

Expand Down Expand Up @@ -270,7 +270,7 @@ def dag_run_status(dag_id, execution_date):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
log.info(error_message)
log.error(error_message)
response = jsonify({'error': error_message})
response.status_code = 400

Expand Down Expand Up @@ -383,7 +383,7 @@ def get_lineage(dag_id: str, execution_date: str):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
log.info(error_message)
log.error(error_message)
response = jsonify({'error': error_message})
response.status_code = 400

Expand Down
Expand Up @@ -375,7 +375,7 @@ def __write_to_file(self, filepath: str, content: bytes) -> None:
try:
os.makedirs(os.path.dirname(filepath))
except OSError as exc: # Guard against race condition
self.log.info("Error while creating dir.")
self.log.error("Error while creating dir.")
if exc.errno != errno.EEXIST:
raise
self.log.info("... Done. Dir created.")
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/google/cloud/utils/gcp_authenticator.py
Expand Up @@ -96,7 +96,7 @@ def set_key_path_in_airflow_connection(self):
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
self.log.info('Airflow DB Session error: %s', str(ex))
self.log.error('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
Expand All @@ -122,7 +122,7 @@ def set_dictionary_in_airflow_connection(self):
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
self.log.info('Airflow DB Session error: %s', str(ex))
self.log.error('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
Expand All @@ -146,11 +146,11 @@ def _set_key_path(self):
self.log.info("The %s is not a directory", gcp_config_dir)
key_dir = os.path.join(gcp_config_dir, "keys")
if not os.path.isdir(key_dir):
self.log.info("The %s is not a directory", key_dir)
self.log.error("The %s is not a directory", key_dir)
return
key_path = os.path.join(key_dir, self.gcp_key)
if not os.path.isfile(key_path):
self.log.info("The %s file is missing", key_path)
self.log.error("The %s file is missing", key_path)
self.full_key_path = key_path

def _validate_key_set(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_logging_config.py
Expand Up @@ -171,7 +171,7 @@ def tearDown(self):
def test_loading_invalid_local_settings(self):
from airflow.logging_config import configure_logging, log
with settings_context(SETTINGS_FILE_INVALID):
with patch.object(log, 'warning') as mock_info:
with patch.object(log, 'error') as mock_info:
# Load config
with self.assertRaises(ValueError):
configure_logging()
Expand Down

0 comments on commit 87969a3

Please sign in to comment.