Skip to content

AIP-86 - Calculate and store the dagrun Deadline #51638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

ferruzzi
Copy link
Contributor

@ferruzzi ferruzzi commented Jun 12, 2025

When starting a new dagrun or marking a dagrun as QUEUED, if the DAG has a deadline and that deadline is calculated off one of these dagrun values, then calculate that timestamp and add it to the db table.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link

boring-cyborg bot commented Jun 12, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@ferruzzi ferruzzi requested review from o-nikolas and vincbeck June 12, 2025 00:48
@ferruzzi ferruzzi marked this pull request as draft June 12, 2025 00:51
@ferruzzi ferruzzi marked this pull request as ready for review June 12, 2025 07:21
from airflow.models import DagRun

session.flush()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be the wrong place for this flush. Unit tests had a race condition where the queued_at value was not yet written to the db before this tried to query for it, so I added this to ensure that it is always flushed before checking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s likely better to remove provide_session (and always pass in a session object), and flush the session before calling this function instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue I ran into with that was if I provide a session to _set_dagrun_queued_deadline, even if I don't use it for anything, just add it to the signature, it breaks three of the existing tests and I'm not sure why. Do you perhaps see the cause there? Either way, it's 1AM and I need sleep, I'll poke it more "in the morning".

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you paste the error? It's not clear and not opening for me when clicked

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sorry about that. If I change the signature to

    @provide_session
    def _set_dagrun_queued_deadline(self, session: Session = NEW_SESSION):

Then three tests start to fail:

[2025-06-12T08:57:29.667-0700] {dagbag.py:585} INFO - Filling up the DagBag from /dev/null
FAILED [  2%][2025-06-12T08:57:29.672-0700] {collection.py:82} INFO - Creating ORM DAG for test_clear_task_instances_for_backfill_finished_dagrun

airflow-core/tests/unit/models/test_dagrun.py:159 (TestDagRun.test_clear_task_instances_for_backfill_finished_dagrun[success])
0 != 1

Expected :1
Actual   :0
<Click to see difference>

unit/models/test_dagrun.py:173: in test_clear_task_instances_for_backfill_finished_dagrun
    assert dr0.clear_number == 1
E   assert 0 == 1
E    +  where 0 = <DagRun test_clear_task_instances_for_backfill_finished_dagrun @ 2025-06-12 15:57:29.668728+00:00: backfill__2025-06-12T15:57:29.668728+00:00, state:queued, queued_at: 2025-06-12 15:57:29.686536+00:00. run_type: backfill>.clear_number
[2025-06-12T08:57:29.827-0700] {dagbag.py:585} INFO - Filling up the DagBag from /dev/null
FAILED [  3%][2025-06-12T08:57:29.833-0700] {collection.py:82} INFO - Creating ORM DAG for test_clear_task_instances_for_backfill_finished_dagrun

airflow-core/tests/unit/models/test_dagrun.py:159 (TestDagRun.test_clear_task_instances_for_backfill_finished_dagrun[failed])
0 != 1

Expected :1
Actual   :0
<Click to see difference>

unit/models/test_dagrun.py:173: in test_clear_task_instances_for_backfill_finished_dagrun
    assert dr0.clear_number == 1
E   assert 0 == 1
E    +  where 0 = <DagRun test_clear_task_instances_for_backfill_finished_dagrun @ 2025-06-12 15:57:29.828525+00:00: backfill__2025-06-12T15:57:29.828525+00:00, state:queued, queued_at: 2025-06-12 15:57:29.853305+00:00. run_type: backfill>.clear_number

and

FAILED [ 40%]
airflow-core/tests/unit/models/test_dagrun.py:1029 (TestDagRun.test_next_dagruns_to_examine_only_unpaused[queued])
[<DagRun test_dags @ 2016-01-01 00:00:00+00:00: scheduled__2016-01-01T00:00:00+00:00, state:queued, queued_at: 2025-06-12 15:57:34.377862+00:00. run_type: scheduled>] != []

Expected :[]
Actual   :[<DagRun test_dags @ 2016-01-01 00:00:00+00:00: scheduled__2016-01-01T00:00:00+00:00, state:queued, queued_at: 2025-06-12 15:57:34.377862+00:00. run_type: scheduled>]
<Click to see difference>

unit/models/test_dagrun.py:1077: in test_next_dagruns_to_examine_only_unpaused
    assert runs == []
E   assert equals failed
E      -[�                                 +[]�                              
E      -  <DagRun test_dags @ 2016-01-0�                                  
E      -1 00:00:00+00:00: scheduled__20�                                  
E      -16-01-01T00:00:00+00:00, state:�                                  
E      -queued, queued_at: 2025-06-12 1�                                  
E      -5:57:34.377862+00:00. run_type:�                                  
E      - scheduled>,�                                                     
E      -]�
PASSED [ 41%]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I got it sorted, ready for a re-review.

Comment on lines 424 to 438

if dag_deadline := dag.get_dagrun_deadline():
Deadline.add_deadline(
Deadline(
deadline=dag_deadline.reference.evaluate_with(
interval=dag_deadline.interval,
dag_id=dag.dag_id,
),
callback=dag_deadline.callback,
callback_kwargs=dag_deadline.callback_kwargs or {},
dag_id=dag.dag_id,
dagrun_id=dag_run.run_id,
)
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other places where we 'create' a dagrun, should all those places also handle the associated deadline creation, or is that specific to this endpoint?
(We can name trigger_dag that is used in the command line or execution api, some other places calling dag.create_dagrun which will not create the deadline like we are doing here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, I was conflating API and SDK and thought all dagrun creations fed through here. Maybe this should be in models/dag/create_dagrun if they all funnel through there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be in models/dag/create_dagrun if they all funnel through there?

Yes that could be it indeed. We'd need to verify that they all go through there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked through everywhere in src where a DagRun is being created without the models/dag/create_dagrun method:

  • This doesn't save to the database so I don't think we need to create a deadline

    if create_if_necessary == "memory":
    dag_run = DagRun(
    dag_id=dag.dag_id,
    run_id=logical_date_or_run_id,
    run_type=DagRunType.MANUAL,
    logical_date=dag_run_logical_date,
    data_interval=data_interval,
    run_after=run_after,
    triggered_by=DagRunTriggeredByType.CLI,
    state=DagRunState.RUNNING,
    )
    return dag_run, True

  • Also not saved to the database:

    dag_run = DagRun(
    state=self.dag_run.state,
    dag_id=self.dag_run.dag_id,
    run_id=self.dag_run.run_id,
    run_type=self.dag_run.run_type,
    )

  • In this case, the operator is creating a DagRun in a weird case that seems like it would only be needed when a much bigger problem has occurred. It also says that it's mostly only used in tests. So I don't think adding a deadline would be very helpful here but if you think we should still consider this case, maybe we can add a TODO here for now? Also, I noticed that @ashb , you added this block of exception handing so I was hoping that you could also chime in on whether this needs a deadline to be scheduled

    except NoResultFound:
    # This is _mostly_ only used in tests
    dr = DagRun(
    dag_id=self.dag_id,
    run_id=DagRun.generate_run_id(
    run_type=DagRunType.MANUAL,
    logical_date=info.logical_date,
    run_after=info.run_after,
    ),
    run_type=DagRunType.MANUAL,
    logical_date=info.logical_date,
    data_interval=info.data_interval,
    run_after=info.run_after,
    triggered_by=DagRunTriggeredByType.TEST,
    state=DagRunState.RUNNING,
    )
    ti = TaskInstance(self, run_id=dr.run_id)
    ti.dag_run = dr
    session.add(dr)
    session.flush()

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits/style points

@@ -143,7 +137,7 @@ class BaseDeadlineReference(LoggingMixin, ABC):
def reference_name(cls: Any) -> str:
return cls.__name__

def evaluate_with(self, **kwargs: Any) -> datetime:
def evaluate_with(self, session: Session, interval: timedelta, **kwargs: Any) -> datetime:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def evaluate_with(self, session: Session, interval: timedelta, **kwargs: Any) -> datetime:
def evaluate_with(self, *, session: Session, interval: timedelta, **kwargs: Any) -> datetime:

This applies to all evaluate_with and _evaluate_with. Make these all keyword-only arguments since they are only used as such, and this avoids potential future headaches.

Comment on lines -94 to +95
Deadline.add_deadline(deadline_orm)
session.add(deadline_orm)
session.flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is probably not needed anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because there's other tests (test_dagrun_queued_at_deadline, test_dagrun_success_deadline etc) that would test the writing of deadlines to the database? If so, I think we should still test that the values were saved correctly and match the expected result

Comment on lines 183 to 188
DAGRUN_CREATED = tuple(
{
ReferenceModels.DagRunLogicalDateDeadline,
ReferenceModels.FixedDatetimeDeadline,
}
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just

Suggested change
DAGRUN_CREATED = tuple(
{
ReferenceModels.DagRunLogicalDateDeadline,
ReferenceModels.FixedDatetimeDeadline,
}
)
DAGRUN_CREATED = (
ReferenceModels.DagRunLogicalDateDeadline,
ReferenceModels.FixedDatetimeDeadline,
)

Same below

@ramitkataria
Copy link
Contributor

@ashb @uranusjr @pierrejeambrun I've addressed all the unresolved comments so I'm looking for a re-review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:task-sdk
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants