Skip to content

Commit

Permalink
Don't use time.time() or timezone.utcnow() for duration calculations (#…
Browse files Browse the repository at this point in the history
…12353)

`time.time() - start`, or `timezone.utcnow() - start_dttm` will work
fine in 99% of cases, but it has one fatal flaw:

They both operate on system time, and that can go backwards.

While this might be surprising, it can happen -- usually due to clocks
being adjusted.

And while it is might seem rare, for long running processes it is more
common than we might expect. Most of these durations are harmless to get
wrong (just being logs) it is better to be safe than sorry.

Also the `utcnow()` style I have replaced will be much lighter weight -
creating a date time object is a comparatively expensive operation, and
computing a diff between two even more so, _especially_ when compared to
just subtracting two floats.

To make the "common" case easier of wanting to compute a duration for a
block, I have made `Stats.timer()` return an object that has a
`duration` field.
  • Loading branch information
ashb committed Nov 29, 2020
1 parent 8291fab commit 02d9434
Show file tree
Hide file tree
Showing 19 changed files with 338 additions and 232 deletions.
41 changes: 41 additions & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,47 @@ If this function is designed to be called by "end-users" (i.e. DAG authors) then
...
# You SHOULD not commit the session here. The wrapper will take care of commit()/rollback() if exception
Don't use time() for duration calcuations
-----------------------------------------

If you wish to compute the time difference between two events with in the same process, use
``time.monotonic()``, not ``time.time()`` nor ``timzeone.utcnow()``.

If you are measuring duration for performance reasons, then ``time.perf_counter()`` should be used. (On many
platforms, this uses the same underlying clock mechanism as monotonic, but ``perf_counter`` is guaranteed to be
the highest accuracy clock on the system, monotonic is simply "guaranteed" to not go backwards.)

If you wish to time how long a block of code takes, use ``Stats.timer()`` -- either with a metric name, which
will be timed and submitted automatically:

.. code-block:: python
from airflow.stats import Stats
...
with Stats.timer("my_timer_metric"):
...
or to time but not send a metric:

.. code-block:: python
from airflow.stats import Stats
...
with Stats.timer() as timer:
...
log.info("Code took %.3f seconds", timer.duration)
For full docs on ``timer()`` check out `airflow/stats.py`_.

If the start_date of a duration calculation needs to be stored in a database, then this has to be done using
datetime objects. In all other cases, using datetime for duration calculation MUST be avoided as creating and
diffing datetime operations are (comparatively) slow.

Naming Conventions for provider packages
----------------------------------------

Expand Down
12 changes: 6 additions & 6 deletions airflow/cli/commands/webserver_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(

self._num_workers_running = 0
self._num_ready_workers_running = 0
self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
self._last_refresh_time = time.monotonic() if worker_refresh_interval > 0 else None
self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
self._restart_on_next_plugin_check = False

Expand Down Expand Up @@ -149,9 +149,9 @@ def _get_num_workers_running(self) -> int:

def _wait_until_true(self, fn, timeout: int = 0) -> None:
"""Sleeps until fn is true"""
start_time = time.time()
start_time = time.monotonic()
while not fn():
if 0 < timeout <= time.time() - start_time:
if 0 < timeout <= time.monotonic() - start_time:
raise AirflowWebServerTimeout(f"No response from gunicorn master within {timeout} seconds")
sleep(0.1)

Expand Down Expand Up @@ -274,7 +274,7 @@ def _check_workers(self) -> None:
# If workers should be restarted periodically.
if self.worker_refresh_interval > 0 and self._last_refresh_time:
# and we refreshed the workers a long time ago, refresh the workers
last_refresh_diff = time.time() - self._last_refresh_time
last_refresh_diff = time.monotonic() - self._last_refresh_time
if self.worker_refresh_interval < last_refresh_diff:
num_new_workers = self.worker_refresh_batch_size
self.log.debug(
Expand All @@ -284,7 +284,7 @@ def _check_workers(self) -> None:
num_new_workers,
)
self._spawn_new_workers(num_new_workers)
self._last_refresh_time = time.time()
self._last_refresh_time = time.monotonic()
return

# if we should check the directory with the plugin,
Expand All @@ -308,7 +308,7 @@ def _check_workers(self) -> None:
num_workers_running,
)
self._restart_on_next_plugin_check = False
self._last_refresh_time = time.time()
self._last_refresh_time = time.monotonic()
self._reload_gunicorn()


Expand Down
52 changes: 24 additions & 28 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import threading
import time
from collections import defaultdict
from contextlib import ExitStack, redirect_stderr, redirect_stdout, suppress
from contextlib import redirect_stderr, redirect_stdout, suppress
from datetime import timedelta
from multiprocessing.connection import Connection as MultiprocessingConnection
from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
Expand Down Expand Up @@ -167,17 +167,16 @@ def _run_file_processor(

try:
# redirect stdout/stderr to log
with ExitStack() as exit_stack:
exit_stack.enter_context(redirect_stdout(StreamLogWriter(log, logging.INFO))) # type: ignore
exit_stack.enter_context(redirect_stderr(StreamLogWriter(log, logging.WARN))) # type: ignore
with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
StreamLogWriter(log, logging.WARN)
), Stats.timer() as timer:
# Re-configure the ORM engine as there are issues with multiple processes
settings.configure_orm()

# Change the thread name to differentiate log lines. This is
# really a separate process, but changing the name of the
# process doesn't work, so changing the thread name instead.
threading.current_thread().name = thread_name
start_time = time.time()

log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
Expand All @@ -187,8 +186,7 @@ def _run_file_processor(
callback_requests=callback_requests,
)
result_channel.send(result)
end_time = time.time()
log.info("Processing %s took %.3f seconds", file_path, end_time - start_time)
log.info("Processing %s took %.3f seconds", file_path, timer.duration)
except Exception: # pylint: disable=broad-except
# Log exceptions through the logging framework.
log.exception("Got an exception! Propagating...")
Expand Down Expand Up @@ -1372,34 +1370,32 @@ def repeat(*args, **kwargs):
)

for loop_count in itertools.count(start=1):
loop_start_time = time.time()
with Stats.timer() as timer:

if self.using_sqlite:
self.processor_agent.run_single_parsing_loop()
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.log.debug("Waiting for processors to finish since we're using sqlite")
self.processor_agent.wait_until_finished()
if self.using_sqlite:
self.processor_agent.run_single_parsing_loop()
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.log.debug("Waiting for processors to finish since we're using sqlite")
self.processor_agent.wait_until_finished()

with create_session() as session:
num_queued_tis = self._do_scheduling(session)
with create_session() as session:
num_queued_tis = self._do_scheduling(session)

self.executor.heartbeat()
session.expunge_all()
num_finished_events = self._process_executor_events(session=session)
self.executor.heartbeat()
session.expunge_all()
num_finished_events = self._process_executor_events(session=session)

self.processor_agent.heartbeat()
self.processor_agent.heartbeat()

# Heartbeat the scheduler periodically
self.heartbeat(only_if_necessary=True)
# Heartbeat the scheduler periodically
self.heartbeat(only_if_necessary=True)

# Run any pending timed events
next_event = timers.run(blocking=False)
self.log.debug("Next timed event is in %f", next_event)
# Run any pending timed events
next_event = timers.run(blocking=False)
self.log.debug("Next timed event is in %f", next_event)

loop_end_time = time.time()
loop_duration = loop_end_time - loop_start_time
self.log.debug("Ran scheduling loop in %.2f seconds", loop_duration)
self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration)

if not is_unit_test and not num_queued_tis and not num_finished_events:
# If the scheduler is doing things, don't sleep. This means when there is work to do, the
Expand Down
91 changes: 44 additions & 47 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,38 +438,37 @@ def collect_dags(
return

self.log.info("Filling up the DagBag from %s", dag_folder)
start_dttm = timezone.utcnow()
dag_folder = dag_folder or self.dag_folder
# Used to store stats around DagBag processing
stats = []

dag_folder = correct_maybe_zipped(dag_folder)
for filepath in list_py_file_paths(
dag_folder,
safe_mode=safe_mode,
include_examples=include_examples,
include_smart_sensor=include_smart_sensor,
):
try:
file_parse_start_dttm = timezone.utcnow()
found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)

file_parse_end_dttm = timezone.utcnow()
stats.append(
FileLoadStat(
file=filepath.replace(settings.DAGS_FOLDER, ''),
duration=file_parse_end_dttm - file_parse_start_dttm,
dag_num=len(found_dags),
task_num=sum([len(dag.tasks) for dag in found_dags]),
dags=str([dag.dag_id for dag in found_dags]),
with Stats.timer('collect_dags'):
dag_folder = dag_folder or self.dag_folder
# Used to store stats around DagBag processing
stats = []

dag_folder = correct_maybe_zipped(dag_folder)
for filepath in list_py_file_paths(
dag_folder,
safe_mode=safe_mode,
include_examples=include_examples,
include_smart_sensor=include_smart_sensor,
):
try:
file_parse_start_dttm = timezone.utcnow()
found_dags = self.process_file(
filepath, only_if_updated=only_if_updated, safe_mode=safe_mode
)
)
except Exception as e: # pylint: disable=broad-except
self.log.exception(e)

end_dttm = timezone.utcnow()
durations = (end_dttm - start_dttm).total_seconds()
Stats.gauge('collect_dags', durations, 1)
file_parse_end_dttm = timezone.utcnow()
stats.append(
FileLoadStat(
file=filepath.replace(settings.DAGS_FOLDER, ''),
duration=file_parse_end_dttm - file_parse_start_dttm,
dag_num=len(found_dags),
task_num=sum([len(dag.tasks) for dag in found_dags]),
dags=str([dag.dag_id for dag in found_dags]),
)
)
except Exception as e: # pylint: disable=broad-except
self.log.exception(e)

Stats.gauge('dagbag_size', len(self.dags), 1)
Stats.gauge('dagbag_import_errors', len(self.import_errors), 1)
self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True)
Expand All @@ -483,23 +482,21 @@ def collect_dags_from_db(self):
"""Collects DAGs from database."""
from airflow.models.serialized_dag import SerializedDagModel

start_dttm = timezone.utcnow()
self.log.info("Filling up the DagBag from database")

# The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted
# from the table by the scheduler job.
self.dags = SerializedDagModel.read_all_dags()

# Adds subdags.
# DAG post-processing steps such as self.bag_dag and croniter are not needed as
# they are done by scheduler before serialization.
subdags = {}
for dag in self.dags.values():
for subdag in dag.subdags:
subdags[subdag.dag_id] = subdag
self.dags.update(subdags)

Stats.timing('collect_db_dags', timezone.utcnow() - start_dttm)
with Stats.timer('collect_db_dags'):
self.log.info("Filling up the DagBag from database")

# The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted
# from the table by the scheduler job.
self.dags = SerializedDagModel.read_all_dags()

# Adds subdags.
# DAG post-processing steps such as self.bag_dag and croniter are not needed as
# they are done by scheduler before serialization.
subdags = {}
for dag in self.dags.values():
for subdag in dag.subdags:
subdags[subdag.dag_id] = subdag
self.dags.update(subdags)

def dagbag_report(self):
"""Prints a report around DagBag loading stats"""
Expand Down
43 changes: 20 additions & 23 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,29 +400,26 @@ def update_state(

start_dttm = timezone.utcnow()
self.last_scheduling_decision = start_dttm

dag = self.get_dag()
info = self.task_instance_scheduling_decisions(session)

tis = info.tis
schedulable_tis = info.schedulable_tis
changed_tis = info.changed_tis
finished_tasks = info.finished_tasks
unfinished_tasks = info.unfinished_tasks

none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)

if unfinished_tasks and none_depends_on_past and none_task_concurrency:
# small speed up
are_runnable_tasks = (
schedulable_tis
or self._are_premature_tis(unfinished_tasks, finished_tasks, session)
or changed_tis
)

duration = timezone.utcnow() - start_dttm
Stats.timing(f"dagrun.dependency-check.{self.dag_id}", duration)
with Stats.timer(f"dagrun.dependency-check.{self.dag_id}"):
dag = self.get_dag()
info = self.task_instance_scheduling_decisions(session)

tis = info.tis
schedulable_tis = info.schedulable_tis
changed_tis = info.changed_tis
finished_tasks = info.finished_tasks
unfinished_tasks = info.unfinished_tasks

none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)

if unfinished_tasks and none_depends_on_past and none_task_concurrency:
# small speed up
are_runnable_tasks = (
schedulable_tis
or self._are_premature_tis(unfinished_tasks, finished_tasks, session)
or changed_tis
)

leaf_task_ids = {t.task_id for t in dag.leaves}
leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]
Expand Down

0 comments on commit 02d9434

Please sign in to comment.