Skip to content

Feature(call-back): Enhance DAG-level callback context with enriched metadata and test coverage #51949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

kiran2706
Copy link
Contributor

@kiran2706 kiran2706 commented Jun 20, 2025

Summary

This PR enriches the context passed to DAG-level callbacks (on_success_callback and on_failure_callback) to better align with the structure and metadata typically available in task-level callbacks. While this logic is currently triggered via execute_callbacks=True during tests, it is structured to reflect what should be expected in production use cases as well.

Related
Closes: [#51402]

Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix static checks :)

@kiran2706
Copy link
Contributor Author

Working on it.

@kiran2706 kiran2706 force-pushed the feature/improve-dag-callback-context branch from c1185a1 to 9493126 Compare June 23, 2025 17:03
@kiran2706
Copy link
Contributor Author

Hey @uranusjr,

We can't use context.update(task_instance=ti) since TypedDict.update() doesn't support keyword arguments — mypy throws an error:

error: Unexpected keyword argument "task_instance" for "update" of "TypedDict"

So I've updated it to use a standard dict instead.

Please re-review the changes. Thanks in advance.

@kiran2706 kiran2706 requested a review from uranusjr June 23, 2025 17:12
@kiran2706 kiran2706 marked this pull request as draft June 23, 2025 17:51
Comment on lines +86 to +90
``{{ mark_success_url }}`` str | None |URL to mark the DAG run as successful in the Airflow UI.
``{{ log_url }}`` str | None |URL to the log for the current DAG run or task instance.
``{{ dag_run_url }}`` str | None |URL to the DAG run details page in the Airflow UI.
``{{ end_date }}`` DateTime | None |The end date/time of the DAG run.
``{{ max_tries }}`` int | None |The maximum number of tries for the task instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be removed now they are not being added

Comment on lines +1365 to +1366
success_tis.sort(key=lambda x: x.end_date, reverse=True)
last_relevant_ti = success_tis[0] if success_tis else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be something like

last_relevant_ti = max(success_tis, ...) if success_tis else None

instead.

I kind of wonder if we can even avoid building the list at all.

Also is simply sorting by end_date correct? Especially with trigger_rule, the last success/failed ti might not necessarily be the ti that causes the dag run to be marked as success/failed. Can you check the logic in 2.x to see how the ti is selected?

Comment on lines +2072 to +2073

return prefix + ">"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidental?

Comment on lines +89 to +93
"dag_run_url",
"end_date",
"log_url",
"mark_success_url",
"max_tries",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, should be removed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants