-
Notifications
You must be signed in to change notification settings - Fork 15.2k
Add back dag parsing pre-import optimization #50371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, just double checked with #30495 and left some nits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we might need to add a test for it
I’ll try adding a unit test for it. |
Hi, I attempted to refactor the logic into a function. This refactoring provides the following benefits:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change.
Only a small nit on the test case naming: using test__pre_import_airflow_modules
as a prefix would make it easier to identify the corresponding unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few nits, but overall looks good
@jedcunningham @ephraimbuddy not sure whether it's something missed or intentionally removed. But it looks good to me. I'm planning on merging it early tomorrow |
I just revoke my approval. |
for module in iter_airflow_imports(file_path): | ||
try: | ||
importlib.import_module(module) | ||
except ModuleNotFoundError as e: | ||
log.warning("Error when trying to pre-import module '%s' found in %s: %s", module, file_path, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this.
I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this.
I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.
Can we verify this?
@Lzzz666 would you be able to take a look at the review comments on this PR? |
I found that with the current pre_import implementation (#30495), the performance gains are negligible in airflow 3 —probably because it only pre-imports the Airflow modules actually used in each DAG. In theory this should still speed things up, but my benchmarks didn’t show any improvement. However, when I modified the pre_import function to preload only the “heavier” third-party libraries (NumPy, pandas, the Kubernetes, and Celery), the speed-up became very noticeable in my test dag. so, I'm thinking that
All of my tests involved parsing the same DAG ten times and measuring |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Thanks for looking into it. Looks good
I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports. |
Probably will need to benchmark again to see whether we want it. I feel it probably wouldn't affect much? Not sure |
I ran a benchmark comparing three loading strategies:
Setup
process_creation_time = time.monotonic() - process_start_time immediately after Results “Before parsing loop” vs “before fork”
Baseline vs “before parsing loop”
Conclusion If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency. |
Looks good! @Lzzz666 could you please take a look at the CI failure? Thanks! |
Fixed CI problems! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly good, would be great if we could address these nitpicks
fe58ac9
to
fe29db1
Compare
Airflow 2.6.0 introduced the `AIRFLOW__SCHEDULER__PARSING_PRE_IMPORT_MODULES` setting (apache#30495) to pre-import commonly used modules in the DAG File Processor parent process before forking, which mitigated this performance issue. This optimization logic and the corresponding setting were not carried over to Airflow 3 and the setting is currently ignored (as noted in apache#49839). While apache#49839 proposed removing the setting as unused, this commit re-implements the underlying pre-import optimization functionality to restore this performance benefit in Airflow 3. This helps to reduce the cumulative time spent on imports during serial DAG file parsing. Refs apache#50348 Addresses discussion in apache#49839 Refs apache#30495
Hey @Lzzz666, is there anything still unresolved or that needs further discussion? Please help us resolve comments you thing have already been addressed or reach consensus 🙂 If there's none, I'm planning on merging it later today. @ephraimbuddy please let me know if you want to take another look 🙂 |
About this problem: If this is right, it would mean that after we fix an import error in a dag file, it should be retried in the next cycle without having to restart the dag processor. But if this issue still occurs, should we open a new PR to address it? |
Assuming my experimental setup is correct, should we consider whether to run pre-import prior to the parsing loop? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
for module in iter_airflow_imports(file_path): | ||
try: | ||
importlib.import_module(module) | ||
except ModuleNotFoundError as e: | ||
log.warning("Error when trying to pre-import module '%s' found in %s: %s", module, file_path, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this.
I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.
Can we verify this?
@@ -78,6 +79,11 @@ def inprocess_client(): | |||
return client | |||
|
|||
|
|||
@pytest.fixture | |||
def mock_logger(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture doesn't add anything over using mock directly
@@ -158,6 +169,7 @@ def dag_in_a_fn(): | |||
path=path, | |||
bundle_path=tmp_path, | |||
callbacks=[], | |||
logger=mock_logger, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this change? I thought there was already a logger attribute on the class?
I tried to make an import error, then repaired the module, and the dag was retried and loaded successfully. 2025-06-27.7.48.40.mov |
Related Issue
closes: #50348
This PR reintroduces the pre-import optimization for DAG parsing originally added in Airflow 2.6.0 (#30495) but missing in Airflow 3.0. It reduces DAG parsing time by importing common modules in the parent process before forking.
I referred to #30495 and made modifications to 'processor.py' based on its implementation. I understand the issue is currently assigned to @kevinhongzl, and I’m happy to collaborate or adjust direction depending on their progress.
Looking forward to feedback!
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.