Open
Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Task is failing when running backfill on previous version dag with GitDagBundle
API server:
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2470, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 967, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.9/site-packages/cadwyn/schema_generation.py", line 511, in __call__
return self._original_callable(*args, **kwargs)
File "/opt/airflow/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py", line 247, in ti_run
upstream_map_indexes = dict(_get_upstream_map_indexes(dag.get_task(ti.task_id), ti.map_index))
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/dag.py", line 900, in get_task
raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task test_task not found
Scheduler logs:
[2025-05-13T09:59:13.923+0000] {scheduler_job_runner.py:925} ERROR - DAG 'test_api_dag' for task instance <TaskInstance: test_api_dag.test_task scheduled__2025-05-10T00:00:00+00:00 [queued]> not found in serialized_dag table
[2025-05-13T09:59:13.924+0000] {taskinstance.py:2468} ERROR - Executor CeleryExecutor(parallelism=32) reported that the task instance <TaskInstance: test_api_dag.test_task scheduled__2025-05-10T00:00:00+00:00 [queued]> finished with state failed, but the task instance's state attribute is queued. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
Celery worker: 500 internal server error
What you think should happen instead?
No response
How to reproduce
export AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST='[
{
"name": "dag_version_test",
"classpath": "airflow.providers.git.bundles.git.GitDagBundle",
"kwargs": {
"repo_url": "https://github.com/atul-astronomer/dag_version_test.git",
"subdir": "dags1",
"tracking_ref": "main",
"refresh_interval": 5
}
}
]'
- Use below dag code to create multiple dagruns on past dates.
from datetime import datetime
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
dag = DAG(
'test_api_dag',
start_date=datetime(2025, 5, 1, 3, 28, 0),
schedule='@daily',
is_paused_upon_creation=False,
catchup=True
)
hello_task = BashOperator(
task_id='test_task',
bash_command='echo "Hello World from Airflow!"',
do_xcom_push = True,
dag=dag,
)
bye_task = BashOperator(
task_id='test_task_bye',
bash_command='echo "Bye World from Airflow!"',
dag=dag,
)
hello_again = BashOperator(
task_id='test_task_hello2',
bash_command='echo "Hello World from Airflow!"',
dag=dag,
)
hello_task >> bye_task >> hello_again
- Once multiple dagruns are available on version 1, push changes to the repo configured in init.sh to change the version using below code.
from datetime import datetime
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
dag = DAG(
'test_api_dag',
start_date=datetime(2025, 5, 1, 3, 28, 0),
schedule='@daily',
is_paused_upon_creation=False,
catchup=True
)
hello_task = BashOperator(
task_id='test_task1',
bash_command='echo "Hello World from Airflow!"',
do_xcom_push = True,
dag=dag,
)
bye_task = BashOperator(
task_id='test_task_bye',
bash_command='echo "Bye World from Airflow!"',
dag=dag,
)
hello_again = BashOperator(
task_id='test_task_hello',
bash_command='echo "Hello World from Airflow!"',
dag=dag,
)
hello_task >> bye_task >> hello_again
- Create a dagrun on version 2.
- Now create backfill on previous dates to override version 1 dagruns.
- Notice backfill dagruns have failed.
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct