Skip to content

Commit 6f2277c

Browse files
RNHTTRjedcunninghamephraimbuddy
authored
Simplify logic to resolve tasks stuck in queued despite stalled_task_timeout (#30375)
* simplify and consolidate logic for tasks stuck in queued * simplify and consolidate logic for tasks stuck in queued * simplify and consolidate logic for tasks stuck in queued * fixed tests; updated fail stuck tasks to use run_with_db_retries * mypy; fixed tests * fix task_adoption_timeout in celery integration test * addressing comments * remove useless print * fix typo * move failure logic to executor * fix scheduler job test * adjustments for new scheduler job * appeasing static checks * fix test for new scheduler job paradigm * Updating docs for deprecations * news & small changes * news & small changes * Update newsfragments/30375.significant.rst Co-authored-by: Jed Cunningham <[email protected]> * Update newsfragments/30375.significant.rst Co-authored-by: Jed Cunningham <[email protected]> * added cleanup stuck task functionality to base executor * fix sloppy mistakes & mypy * removing self.fail from base_executor * Update airflow/jobs/scheduler_job_runner.py Co-authored-by: Jed Cunningham <[email protected]> * Update airflow/jobs/scheduler_job_runner.py Co-authored-by: Jed Cunningham <[email protected]> * Fix job_id filter * Don't even run query if executor doesn't support timing out queued tasks * Add support for LocalKubernetesExecutor and CeleryKubernetesExecutor * Add config option to control how often it runs - we want it quicker than the timeout * Fixup newsfragment * mark old KE pending pod check interval as deprecated by new check interval * Fixup deprecation warnings This more closely mirrors how deprecations are raised for "normal" deprecations. I've removed the depth, as moving up the stack doesn't really help the user at all in this situation. * Another deprecation cleanup * Remove db retries * Fix test --------- Co-authored-by: Jed Cunningham <[email protected]> Co-authored-by: Jed Cunningham <[email protected]> Co-authored-by: Ephraim Anierobi <[email protected]>
1 parent 1ebeb19 commit 6f2277c

17 files changed

+301
-544
lines changed

airflow/config_templates/config.yml

Lines changed: 15 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,26 +2047,6 @@ celery:
20472047
type: boolean
20482048
example: ~
20492049
default: "True"
2050-
task_adoption_timeout:
2051-
description: |
2052-
Time in seconds after which adopted tasks which are queued in celery are assumed to be stalled,
2053-
and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but
2054-
applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting
2055-
also applies to adopted tasks. To calculate adoption time, subtract the
2056-
:ref:`task duration<ui:task-duration>` from the task's :ref:`landing time<ui:landing-times>`.
2057-
version_added: 2.0.0
2058-
type: integer
2059-
example: ~
2060-
default: "600"
2061-
stalled_task_timeout:
2062-
description: |
2063-
Time in seconds after which tasks queued in celery are assumed to be stalled, and are automatically
2064-
rescheduled. Adopted tasks will instead use the ``task_adoption_timeout`` setting if specified.
2065-
When set to 0, automatic clearing of stalled tasks is disabled.
2066-
version_added: 2.3.1
2067-
type: integer
2068-
example: ~
2069-
default: "0"
20702050
task_publish_max_retries:
20712051
description: |
20722052
The Maximum number of retries for publishing task messages to the broker when failing
@@ -2415,6 +2395,21 @@ scheduler:
24152395
type: string
24162396
example: ~
24172397
default: "15"
2398+
task_queued_timeout:
2399+
description: |
2400+
Amount of time a task can be in the queued state before being retried or set to failed.
2401+
version_added: 2.6.0
2402+
type: float
2403+
example: ~
2404+
default: "600.0"
2405+
task_queued_timeout_check_interval:
2406+
description: |
2407+
How often to check for tasks that have been in the queued state for
2408+
longer than `[scheduler] task_queued_timeout`.
2409+
version_added: 2.6.0
2410+
type: float
2411+
example: ~
2412+
default: "120.0"
24182413
triggerer:
24192414
description: ~
24202415
options:
@@ -2731,35 +2726,13 @@ kubernetes_executor:
27312726
type: boolean
27322727
example: ~
27332728
default: "True"
2734-
worker_pods_pending_timeout:
2735-
description: |
2736-
How long in seconds a worker can be in Pending before it is considered a failure
2737-
version_added: 2.1.0
2738-
type: integer
2739-
example: ~
2740-
default: "300"
2741-
worker_pods_pending_timeout_check_interval:
2742-
description: |
2743-
How often in seconds to check if Pending workers have exceeded their timeouts
2744-
version_added: 2.1.0
2745-
type: integer
2746-
example: ~
2747-
default: "120"
27482729
worker_pods_queued_check_interval:
27492730
description: |
27502731
How often in seconds to check for task instances stuck in "queued" status without a pod
27512732
version_added: 2.2.0
27522733
type: integer
27532734
example: ~
27542735
default: "60"
2755-
worker_pods_pending_timeout_batch_size:
2756-
description: |
2757-
How many pending pods to check for timeout violations in each check interval.
2758-
You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``.
2759-
version_added: 2.1.0
2760-
type: integer
2761-
example: ~
2762-
default: "100"
27632736
ssl_ca_cert:
27642737
description: |
27652738
Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate.

airflow/config_templates/default_airflow.cfg

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,18 +1033,6 @@ operation_timeout = 1.0
10331033
# or run in HA mode, it can adopt the orphan tasks launched by previous SchedulerJob.
10341034
task_track_started = True
10351035

1036-
# Time in seconds after which adopted tasks which are queued in celery are assumed to be stalled,
1037-
# and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but
1038-
# applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting
1039-
# also applies to adopted tasks. To calculate adoption time, subtract the
1040-
# :ref:`task duration<ui:task-duration>` from the task's :ref:`landing time<ui:landing-times>`.
1041-
task_adoption_timeout = 600
1042-
1043-
# Time in seconds after which tasks queued in celery are assumed to be stalled, and are automatically
1044-
# rescheduled. Adopted tasks will instead use the ``task_adoption_timeout`` setting if specified.
1045-
# When set to 0, automatic clearing of stalled tasks is disabled.
1046-
stalled_task_timeout = 0
1047-
10481036
# The Maximum number of retries for publishing task messages to the broker when failing
10491037
# due to ``AirflowTaskTimeout`` error before giving up and marking Task as failed.
10501038
task_publish_max_retries = 3
@@ -1228,6 +1216,13 @@ allow_trigger_in_future = False
12281216
# How often to check for expired trigger requests that have not run yet.
12291217
trigger_timeout_check_interval = 15
12301218

1219+
# Amount of time a task can be in the queued state before being retried or set to failed.
1220+
task_queued_timeout = 600.0
1221+
1222+
# How often to check for tasks that have been in the queued state for
1223+
# longer than `[scheduler] task_queued_timeout`.
1224+
task_queued_timeout_check_interval = 120.0
1225+
12311226
[triggerer]
12321227
# How many triggers a single Triggerer will run at once, by default.
12331228
default_capacity = 1000
@@ -1372,19 +1367,9 @@ tcp_keep_cnt = 6
13721367
# Set this to false to skip verifying SSL certificate of Kubernetes python client.
13731368
verify_ssl = True
13741369

1375-
# How long in seconds a worker can be in Pending before it is considered a failure
1376-
worker_pods_pending_timeout = 300
1377-
1378-
# How often in seconds to check if Pending workers have exceeded their timeouts
1379-
worker_pods_pending_timeout_check_interval = 120
1380-
13811370
# How often in seconds to check for task instances stuck in "queued" status without a pod
13821371
worker_pods_queued_check_interval = 60
13831372

1384-
# How many pending pods to check for timeout violations in each check interval.
1385-
# You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``.
1386-
worker_pods_pending_timeout_batch_size = 100
1387-
13881373
# Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate.
13891374
ssl_ca_cert =
13901375

airflow/configuration.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,22 @@ class AirflowConfigParser(ConfigParser):
234234
("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
235235
("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
236236
("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
237+
("scheduler", "task_queued_timeout_check_interval"): (
238+
"kubernetes_executor",
239+
"worker_pods_pending_timeout_check_interval",
240+
"2.6.0",
241+
),
242+
}
243+
244+
# A mapping of new configurations to a list of old configurations for when one configuration
245+
# deprecates more than one other deprecation. The deprecation logic for these configurations
246+
# is defined in SchedulerJobRunner.
247+
many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
248+
("scheduler", "task_queued_timeout"): [
249+
("celery", "stalled_task_timeout", "2.6.0"),
250+
("celery", "task_adoption_timeout", "2.6.0"),
251+
("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
252+
]
237253
}
238254

239255
# A mapping of new section -> (old section, since_version).
@@ -548,12 +564,10 @@ def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
548564

549565
@overload # type: ignore[override]
550566
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override]
551-
552567
...
553568

554569
@overload # type: ignore[override]
555570
def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override]
556-
557571
...
558572

559573
def get( # type: ignore[override, misc]
@@ -1070,7 +1084,7 @@ def as_dict(
10701084
# This ensures the ones from config file is hidden too
10711085
# if they are not provided through env, cmd and secret
10721086
hidden = "< hidden >"
1073-
for (section, key) in self.sensitive_config_values:
1087+
for section, key in self.sensitive_config_values:
10741088
if not config_sources.get(section):
10751089
continue
10761090
if config_sources[section].get(key, None):
@@ -1089,7 +1103,7 @@ def _include_secrets(
10891103
display_source: bool,
10901104
raw: bool,
10911105
):
1092-
for (section, key) in self.sensitive_config_values:
1106+
for section, key in self.sensitive_config_values:
10931107
value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
10941108
if value:
10951109
if not display_sensitive:
@@ -1110,7 +1124,7 @@ def _include_commands(
11101124
display_source: bool,
11111125
raw: bool,
11121126
):
1113-
for (section, key) in self.sensitive_config_values:
1127+
for section, key in self.sensitive_config_values:
11141128
opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
11151129
if not opt:
11161130
continue
@@ -1188,7 +1202,7 @@ def _filter_by_source(
11881202
:return: None, the given config_sources is filtered if necessary,
11891203
otherwise untouched.
11901204
"""
1191-
for (section, key) in self.sensitive_config_values:
1205+
for section, key in self.sensitive_config_values:
11921206
# Don't bother if we don't have section / key
11931207
if section not in config_sources or key not in config_sources[section]:
11941208
continue
@@ -1222,7 +1236,7 @@ def _replace_config_with_display_sources(
12221236
include_cmds: bool,
12231237
include_secret: bool,
12241238
):
1225-
for (source_name, config) in configs:
1239+
for source_name, config in configs:
12261240
for section in config.sections():
12271241
AirflowConfigParser._replace_section_config_with_display_sources(
12281242
config,
@@ -1249,7 +1263,7 @@ def _deprecated_value_is_set_in_config(
12491263
continue
12501264
try:
12511265
deprecated_section_array = config.items(section=deprecated_section, raw=True)
1252-
for (key_candidate, _) in deprecated_section_array:
1266+
for key_candidate, _ in deprecated_section_array:
12531267
if key_candidate == deprecated_key:
12541268
return True
12551269
except NoSectionError:

airflow/executors/base_executor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,18 @@ def terminate(self):
376376
"""This method is called when the daemon receives a SIGTERM."""
377377
raise NotImplementedError()
378378

379+
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
380+
"""
381+
Handle remnants of tasks that were failed because they were stuck in queued.
382+
Tasks can get stuck in queued. If such a task is detected, it will be marked
383+
as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED`
384+
if it doesn't.
385+
386+
:param tis: List of Task Instances to clean up
387+
:return: List of readable task instances for a warning message
388+
"""
389+
raise NotImplementedError()
390+
379391
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
380392
"""
381393
Try to adopt running task instances that have been abandoned by a SchedulerJob dying.

0 commit comments

Comments
 (0)