-
Notifications
You must be signed in to change notification settings - Fork 15.3k
AIP-86 - Calculate and store the dagrun Deadline #51638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
AIP-86 - Calculate and store the dagrun Deadline #51638
Conversation
… DAG has an applicable Deadline.
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
Outdated
Show resolved
Hide resolved
from airflow.models import DagRun | ||
|
||
session.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be the wrong place for this flush. Unit tests had a race condition where the queued_at value was not yet written to the db before this tried to query for it, so I added this to ensure that it is always flushed before checking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s likely better to remove provide_session
(and always pass in a session object), and flush the session before calling this function instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue I ran into with that was if I provide a session to _set_dagrun_queued_deadline
, even if I don't use it for anything, just add it to the signature, it breaks three of the existing tests and I'm not sure why. Do you perhaps see the cause there? Either way, it's 1AM and I need sleep, I'll poke it more "in the morning".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you paste the error? It's not clear and not opening for me when clicked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, sorry about that. If I change the signature to
@provide_session
def _set_dagrun_queued_deadline(self, session: Session = NEW_SESSION):
Then three tests start to fail:
[2025-06-12T08:57:29.667-0700] {dagbag.py:585} INFO - Filling up the DagBag from /dev/null
FAILED [ 2%][2025-06-12T08:57:29.672-0700] {collection.py:82} INFO - Creating ORM DAG for test_clear_task_instances_for_backfill_finished_dagrun
airflow-core/tests/unit/models/test_dagrun.py:159 (TestDagRun.test_clear_task_instances_for_backfill_finished_dagrun[success])
0 != 1
Expected :1
Actual :0
<Click to see difference>
unit/models/test_dagrun.py:173: in test_clear_task_instances_for_backfill_finished_dagrun
assert dr0.clear_number == 1
E assert 0 == 1
E + where 0 = <DagRun test_clear_task_instances_for_backfill_finished_dagrun @ 2025-06-12 15:57:29.668728+00:00: backfill__2025-06-12T15:57:29.668728+00:00, state:queued, queued_at: 2025-06-12 15:57:29.686536+00:00. run_type: backfill>.clear_number
[2025-06-12T08:57:29.827-0700] {dagbag.py:585} INFO - Filling up the DagBag from /dev/null
FAILED [ 3%][2025-06-12T08:57:29.833-0700] {collection.py:82} INFO - Creating ORM DAG for test_clear_task_instances_for_backfill_finished_dagrun
airflow-core/tests/unit/models/test_dagrun.py:159 (TestDagRun.test_clear_task_instances_for_backfill_finished_dagrun[failed])
0 != 1
Expected :1
Actual :0
<Click to see difference>
unit/models/test_dagrun.py:173: in test_clear_task_instances_for_backfill_finished_dagrun
assert dr0.clear_number == 1
E assert 0 == 1
E + where 0 = <DagRun test_clear_task_instances_for_backfill_finished_dagrun @ 2025-06-12 15:57:29.828525+00:00: backfill__2025-06-12T15:57:29.828525+00:00, state:queued, queued_at: 2025-06-12 15:57:29.853305+00:00. run_type: backfill>.clear_number
and
FAILED [ 40%]
airflow-core/tests/unit/models/test_dagrun.py:1029 (TestDagRun.test_next_dagruns_to_examine_only_unpaused[queued])
[<DagRun test_dags @ 2016-01-01 00:00:00+00:00: scheduled__2016-01-01T00:00:00+00:00, state:queued, queued_at: 2025-06-12 15:57:34.377862+00:00. run_type: scheduled>] != []
Expected :[]
Actual :[<DagRun test_dags @ 2016-01-01 00:00:00+00:00: scheduled__2016-01-01T00:00:00+00:00, state:queued, queued_at: 2025-06-12 15:57:34.377862+00:00. run_type: scheduled>]
<Click to see difference>
unit/models/test_dagrun.py:1077: in test_next_dagruns_to_examine_only_unpaused
assert runs == []
E assert equals failed
E -[� +[]�
E - <DagRun test_dags @ 2016-01-0�
E -1 00:00:00+00:00: scheduled__20�
E -16-01-01T00:00:00+00:00, state:�
E -queued, queued_at: 2025-06-12 1�
E -5:57:34.377862+00:00. run_type:�
E - scheduled>,�
E -]�
PASSED [ 41%]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I got it sorted, ready for a re-review.
|
||
if dag_deadline := dag.get_dagrun_deadline(): | ||
Deadline.add_deadline( | ||
Deadline( | ||
deadline=dag_deadline.reference.evaluate_with( | ||
interval=dag_deadline.interval, | ||
dag_id=dag.dag_id, | ||
), | ||
callback=dag_deadline.callback, | ||
callback_kwargs=dag_deadline.callback_kwargs or {}, | ||
dag_id=dag.dag_id, | ||
dagrun_id=dag_run.run_id, | ||
) | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other places where we 'create' a dagrun, should all those places also handle the associated deadline creation, or is that specific to this endpoint?
(We can name trigger_dag
that is used in the command line or execution api, some other places calling dag.create_dagrun
which will not create the deadline like we are doing here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right, I was conflating API and SDK and thought all dagrun creations fed through here. Maybe this should be in models/dag/create_dagrun if they all funnel through there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be in models/dag/create_dagrun if they all funnel through there?
Yes that could be it indeed. We'd need to verify that they all go through there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked through everywhere in src where a DagRun is being created without the models/dag/create_dagrun
method:
-
This doesn't save to the database so I don't think we need to create a deadline
airflow/airflow-core/src/airflow/cli/commands/task_command.py
Lines 123 to 134 in c3f9da4
if create_if_necessary == "memory": dag_run = DagRun( dag_id=dag.dag_id, run_id=logical_date_or_run_id, run_type=DagRunType.MANUAL, logical_date=dag_run_logical_date, data_interval=data_interval, run_after=run_after, triggered_by=DagRunTriggeredByType.CLI, state=DagRunState.RUNNING, ) return dag_run, True -
Also not saved to the database:
airflow/airflow-core/src/airflow/exceptions.py
Lines 273 to 278 in c3f9da4
dag_run = DagRun( state=self.dag_run.state, dag_id=self.dag_run.dag_id, run_id=self.dag_run.run_id, run_type=self.dag_run.run_type, ) -
In this case, the operator is creating a DagRun in a weird case that seems like it would only be needed when a much bigger problem has occurred. It also says that it's mostly only used in tests. So I don't think adding a deadline would be very helpful here but if you think we should still consider this case, maybe we can add a TODO here for now? Also, I noticed that @ashb , you added this block of exception handing so I was hoping that you could also chime in on whether this needs a deadline to be scheduled
airflow/airflow-core/src/airflow/models/baseoperator.py
Lines 448 to 467 in c3f9da4
except NoResultFound: # This is _mostly_ only used in tests dr = DagRun( dag_id=self.dag_id, run_id=DagRun.generate_run_id( run_type=DagRunType.MANUAL, logical_date=info.logical_date, run_after=info.run_after, ), run_type=DagRunType.MANUAL, logical_date=info.logical_date, data_interval=info.data_interval, run_after=info.run_after, triggered_by=DagRunTriggeredByType.TEST, state=DagRunState.RUNNING, ) ti = TaskInstance(self, run_id=dr.run_id) ti.dag_run = dr session.add(dr) session.flush()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits/style points
…ithub.com/aws-mwaa/upstream-to-airflow into ferruzzi/deadlines/05-calculate-on-dagrun
…ithub.com/aws-mwaa/upstream-to-airflow into ferruzzi/deadlines/05-calculate-on-dagrun
@@ -143,7 +137,7 @@ class BaseDeadlineReference(LoggingMixin, ABC): | |||
def reference_name(cls: Any) -> str: | |||
return cls.__name__ | |||
|
|||
def evaluate_with(self, **kwargs: Any) -> datetime: | |||
def evaluate_with(self, session: Session, interval: timedelta, **kwargs: Any) -> datetime: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def evaluate_with(self, session: Session, interval: timedelta, **kwargs: Any) -> datetime: | |
def evaluate_with(self, *, session: Session, interval: timedelta, **kwargs: Any) -> datetime: |
This applies to all evaluate_with
and _evaluate_with
. Make these all keyword-only arguments since they are only used as such, and this avoids potential future headaches.
Deadline.add_deadline(deadline_orm) | ||
session.add(deadline_orm) | ||
session.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is probably not needed anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because there's other tests (test_dagrun_queued_at_deadline
, test_dagrun_success_deadline
etc) that would test the writing of deadlines to the database? If so, I think we should still test that the values were saved correctly and match the expected result
DAGRUN_CREATED = tuple( | ||
{ | ||
ReferenceModels.DagRunLogicalDateDeadline, | ||
ReferenceModels.FixedDatetimeDeadline, | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just
DAGRUN_CREATED = tuple( | |
{ | |
ReferenceModels.DagRunLogicalDateDeadline, | |
ReferenceModels.FixedDatetimeDeadline, | |
} | |
) | |
DAGRUN_CREATED = ( | |
ReferenceModels.DagRunLogicalDateDeadline, | |
ReferenceModels.FixedDatetimeDeadline, | |
) |
Same below
@ashb @uranusjr @pierrejeambrun I've addressed all the unresolved comments so I'm looking for a re-review |
When starting a new dagrun or marking a dagrun as QUEUED, if the DAG has a deadline and that deadline is calculated off one of these dagrun values, then calculate that timestamp and add it to the db table.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.