Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.10.5
What happened?
When I update my dag file and add a new task, it will be repeatedly scheduled many times in the first dag_run, while in subsequent dag_runs, the scheduling will return to normal. In some cases, it can reach over 100 times. Moreover, this repetition of scheduling is not related to the status of the task itself; whether it is in a successful state or running, it will be repeatedly scheduled.
The task log contains the following information:
[2025-06-24, 17:11:07 CST] {taskinstance.py:3094} ERROR - Received SIGTERM. Terminating subprocesses. [2025-06-24, 17:11:07 CST] {taskinstance.py:3095} ERROR - Stacktrace: File "/data/miniconda/envs/py311/bin/airflow", line 8, in <module> sys.exit(main()) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/__main__.py", line 62, in main args.func(args) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/cli.py", line 116, in wrapper return f(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/providers/celery/cli/celery_command.py", line 64, in wrapper providers_configuration_loaded(func)(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/providers/celery/cli/celery_command.py", line 237, in worker _run_command_with_daemon_option( File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/providers/celery/cli/celery_command.py", line 50, in _run_command_with_daemon_option run_command_with_daemon_option(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option callback() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/providers/celery/cli/celery_command.py", line 230, in run_celery_worker celery_app.worker_main(options) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/app/base.py", line 391, in worker_main self.start(argv=argv) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/app/base.py", line 371, in start celery.main(args=argv, standalone_mode=False) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/click/core.py", line 1082, in main rv = self.invoke(ctx) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/click/core.py", line 1697, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/click/core.py", line 1443, in invoke return ctx.invoke(self.callback, **ctx.params) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/click/core.py", line 788, in invoke return __callback(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/click/decorators.py", line 33, in new_func return f(get_current_context(), *args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/bin/base.py", line 135, in caller return f(ctx, *args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/bin/worker.py", line 356, in worker worker.start() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start self.blueprint.start(self) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start return self.obj.start() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/concurrency/base.py", line 130, in start self.on_start() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/concurrency/prefork.py", line 109, in on_start P = self._pool = Pool(processes=self.limit, File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 464, in __init__ super().__init__(processes, *args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/pool.py", line 1046, in __init__ self._create_worker_process(i) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 482, in _create_worker_process return super()._create_worker_process(i) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/pool.py", line 1158, in _create_worker_process w.start() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/process.py", line 120, in start self._popen = self._Popen(self) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/context.py", line 331, in _Popen return Popen(process_obj) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/popen_fork.py", line 22, in __init__ self._launch(process_obj) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/popen_fork.py", line 77, in _launch code = process_obj._bootstrap() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/process.py", line 323, in _bootstrap self.run() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/process.py", line 110, in run self._target(*self._args, **self._kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/pool.py", line 292, in __call__ sys.exit(self.workloop(pid=pid)) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/billiard/pool.py", line 362, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/app/trace.py", line 651, in fast_trace_task R, I, T, Rstr = tasks[task].__trace__( File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/app/trace.py", line 453, in trace_task R = retval = fun(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/celery/app/trace.py", line 736, in __protected_call__ return self.run(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 139, in execute_command _execute_in_fork(command_to_exec, celery_task_id) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 171, in _execute_in_fork args.func(args) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/cli.py", line 116, in wrapper return f(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method return _run_task_by_local_task_job(args, ti) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 322, in _run_task_by_local_task_job ret = run_job(job=job_runner.job, execute_callable=job_runner._execute) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/session.py", line 97, in wrapper return func(*args, session=session, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/jobs/job.py", line 421, in run_job return execute_job(job, execute_callable=execute_callable) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/jobs/job.py", line 450, in execute_job ret = execute_callable() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/jobs/local_task_job_runner.py", line 171, in _execute self.task_runner.start() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/task/task_runner/standard_task_runner.py", line 55, in start self.process = self._start_by_fork() File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork ret = args.func(args, dag=self.dag) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/cli.py", line 116, in wrapper return f(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method return _run_raw_task(args, ti) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task return ti._run_raw_task( File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/session.py", line 97, in wrapper return func(*args, session=session, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3006, in _run_raw_task return _run_raw_task( File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 274, in _run_raw_task TaskInstance._execute_task_with_callbacks( File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3161, in _execute_task_with_callbacks result = self._execute_task(context, task_orig) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3185, in _execute_task return _execute_task(self, context, task_orig) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable return ExecutionCallableRunner( File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper return func(self, *args, **kwargs) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/operators/bash.py", line 257, in execute result = self._run_inline_command(bash_path=bash_path, env=env) File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/operators/bash.py", line 284, in _run_inline_command return self.subprocess_hook.run_command( File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/hooks/subprocess.py", line 107, in run_command for raw_line in iter(self.sub_process.stdout.readline, b""): File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3095, in signal_handler self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack())) [2025-06-24, 17:11:07 CST] {subprocess.py:143} INFO - Sending SIGTERM signal to process group [2025-06-24, 17:11:13 CST] {taskinstance.py:3313} ERROR - Task failed with exception Traceback (most recent call last): File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/operators/bash.py", line 257, in execute result = self._run_inline_command(bash_path=bash_path, env=env) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/operators/bash.py", line 284, in _run_inline_command return self.subprocess_hook.run_command( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/hooks/subprocess.py", line 107, in run_command for raw_line in iter(self.sub_process.stdout.readline, b""): File "/data/miniconda/envs/py311/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3097, in signal_handler raise AirflowTaskTerminated("Task received SIGTERM signal") airflow.exceptions.AirflowTaskTerminated: Task received SIGTERM signal [2025-06-24, 17:11:13 CST] {taskinstance.py:1226} INFO - Marking task as UP_FOR_RETRY. dag_id=analysis, task_id=analysis.fb_af_skan_agg, run_id=scheduled__2025-06-22T16:00:00+00:00, execution_date=20250622T160000, start_date=20250624T090856, end_date=20250624T091113 [2025-06-24, 17:11:13 CST] {taskinstance.py:1564} INFO - Executing callback at index 0: call
The scheduler log contains the following key pieces of information:
Restoring task '<TaskInstance: xxx scheduled__2025-06-14T16:00:00+00:00 [removed]>' which was previously removed from DAG '<DAG: sqoop_etl_enterprise>'
What you think should happen instead?
The task will be scheduled only once normally, and it will not be wrongly marked as "removed".
How to reproduce
Update the dag file and add the operator
Operating System
CentOS Linux 7
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct