Skip to content

Add back dag parsing pre-import optimization #50371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Lzzz666
Copy link
Contributor

@Lzzz666 Lzzz666 commented May 8, 2025

Related Issue

closes: #50348


This PR reintroduces the pre-import optimization for DAG parsing originally added in Airflow 2.6.0 (#30495) but missing in Airflow 3.0. It reduces DAG parsing time by importing common modules in the parent process before forking.

I referred to #30495 and made modifications to 'processor.py' based on its implementation. I understand the issue is currently assigned to @kevinhongzl, and I’m happy to collaborate or adjust direction depending on their progress.

Looking forward to feedback!


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, just double checked with #30495 and left some nits.

@Lzzz666 Lzzz666 marked this pull request as draft May 10, 2025 04:09
@Lzzz666 Lzzz666 marked this pull request as ready for review May 10, 2025 07:01
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the change.

Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we might need to add a test for it

@eladkal eladkal added this to the Airflow 3.0.2 milestone May 12, 2025
@eladkal eladkal added the backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch label May 12, 2025
@Lzzz666
Copy link
Contributor Author

Lzzz666 commented May 13, 2025

Also we might need to add a test for it

I’ll try adding a unit test for it.

@Lzzz666 Lzzz666 marked this pull request as draft May 14, 2025 14:05
@Lzzz666
Copy link
Contributor Author

Lzzz666 commented May 15, 2025

Hi, I attempted to refactor the logic into a function.

This refactoring provides the following benefits:

  1. Modularity: encapsulates the pre-import logic in a dedicated function
  2. Easier testing: allows the _pre_import_airflow_modules function to be unit-tested in isolation
  3. Improved readability: the function name clearly indicates its purpose

@Lzzz666 Lzzz666 marked this pull request as ready for review May 15, 2025 15:07
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change.

Only a small nit on the test case naming: using test__pre_import_airflow_modules as a prefix would make it easier to identify the corresponding unit tests.

Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few nits, but overall looks good

@Lee-W
Copy link
Member

Lee-W commented May 19, 2025

@jedcunningham @ephraimbuddy not sure whether it's something missed or intentionally removed. But it looks good to me. I'm planning on merging it early tomorrow

@Lee-W Lee-W self-requested a review May 19, 2025 14:47
@Lee-W
Copy link
Member

Lee-W commented May 19, 2025

I just revoke my approval.

Comment on lines +116 to +122
for module in iter_airflow_imports(file_path):
try:
importlib.import_module(module)
except ModuleNotFoundError as e:
log.warning("Error when trying to pre-import module '%s' found in %s: %s", module, file_path, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this.
I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.

Copy link
Contributor

@ephraimbuddy ephraimbuddy Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this.
I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.

Can we verify this?

@amoghrajesh
Copy link
Contributor

@Lzzz666 would you be able to take a look at the review comments on this PR?

@Lzzz666 Lzzz666 marked this pull request as draft June 15, 2025 18:13
@Lzzz666
Copy link
Contributor Author

Lzzz666 commented Jun 16, 2025

I found that with the current pre_import implementation (#30495), the performance gains are negligible in airflow 3 —probably because it only pre-imports the Airflow modules actually used in each DAG. In theory this should still speed things up, but my benchmarks didn’t show any improvement. However, when I modified the pre_import function to preload only the “heavier” third-party libraries (NumPy, pandas, the Kubernetes, and Celery), the speed-up became very noticeable in my test dag.

so, I'm thinking that

  1. Why pre-importing just the present dag's Airflow modules doesn't eliminate its parse time?
  2. If we want to pre-import all of Airflow’s core modules, which ones should we include?

All of my tests involved parsing the same DAG ten times and measuring run_duration (which is defined as run_duration = time.monotonic() - proc.start_time), and exclude first startup cost.

  1. First test with origin pre-imports method
    image

  2. Second test with origin pre-imports method
    image

  3. Modify pre-import to only include NumPy, pandas, Kubernetes, celery
    image

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Thanks for looking into it. Looks good

@Lzzz666
Copy link
Contributor Author

Lzzz666 commented Jun 16, 2025

I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports.

@Lee-W
Copy link
Member

Lee-W commented Jun 17, 2025

I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports.

Probably will need to benchmark again to see whether we want it. I feel it probably wouldn't affect much? Not sure

@Lzzz666
Copy link
Contributor Author

Lzzz666 commented Jun 17, 2025

I ran a benchmark comparing three loading strategies:

  1. Pre-import before parsing loop
    Call _pre_import_airflow_modules() once before entering the while loop in _run_parsing_loop().
  2. Pre-import before fork
    Call it right before fork process ( original place )
  3. No pre-import

Setup

  • 1,000 parse iterations per experiment
  • Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s (no providers, api, www, cli)
  • Averaged 1,000 parse (first run excluded)
  • Measured
process_creation_time = time.monotonic() - process_start_time

immediately after self._start_new_processes()

Results

“Before parsing loop” vs “before fork”

  • Moving the pre-import outside the parsing loop reduced process_creation_time by 92.8% compared to doing it before per-fork.

Baseline vs “before parsing loop”

  • Without pre-import at all was about 29% faster than pre-importing before the loop, but the actual time saved was tiny—probably just normal fork timing noise.

image

Conclusion

If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency.

@Lzzz666 Lzzz666 marked this pull request as ready for review June 17, 2025 16:50
@Lee-W
Copy link
Member

Lee-W commented Jun 19, 2025

Looks good! @Lzzz666 could you please take a look at the CI failure? Thanks!

@Lzzz666
Copy link
Contributor Author

Lzzz666 commented Jun 19, 2025

Fixed CI problems!

Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly good, would be great if we could address these nitpicks

@Lzzz666 Lzzz666 force-pushed the main branch 3 times, most recently from fe58ac9 to fe29db1 Compare June 24, 2025 06:52
Airflow 2.6.0 introduced the `AIRFLOW__SCHEDULER__PARSING_PRE_IMPORT_MODULES` setting (apache#30495) to pre-import commonly used modules in the DAG File Processor parent process before forking, which mitigated this performance issue.

This optimization logic and the corresponding setting were not carried over to Airflow 3 and the setting is currently ignored (as noted in apache#49839). While apache#49839 proposed removing the setting as unused, this commit re-implements the underlying pre-import optimization functionality to restore this performance benefit in Airflow 3.

This helps to reduce the cumulative time spent on imports during serial DAG file parsing.

Refs apache#50348
Addresses discussion in apache#49839
Refs apache#30495
@Lee-W Lee-W changed the title [SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348) Add back dag parsing pre-import optimization Jun 24, 2025
@Lee-W
Copy link
Member

Lee-W commented Jun 24, 2025

Hey @Lzzz666, is there anything still unresolved or that needs further discussion? Please help us resolve comments you thing have already been addressed or reach consensus 🙂 If there's none, I'm planning on merging it later today. @ephraimbuddy please let me know if you want to take another look 🙂

@Lzzz666
Copy link
Contributor Author

Lzzz666 commented Jun 24, 2025

eladkal on May 22
Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this.
I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.

About this problem:
I'm not sure, but I tried to add an import of a non-existent module (causing a pre_import error), then removed that non-existent module. After waiting past the interval and letting the next scan run, the dag was retried and loaded successfully—no need to restart the dag processor.

If this is right, it would mean that after we fix an import error in a dag file, it should be retried in the next cycle without having to restart the dag processor.

But if this issue still occurs, should we open a new PR to address it?

@Lzzz666
Copy link
Contributor Author

Lzzz666 commented Jun 24, 2025

I ran a benchmark comparing three loading strategies:

  1. Pre-import before parsing loop
    Call _pre_import_airflow_modules() once before entering the while loop in _run_parsing_loop().
  2. Pre-import before fork
    Call it right before fork process ( original place )
  3. No pre-import

Setup

  • 1,000 parse iterations per experiment
  • Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s (no providers, api, www, cli)
  • Averaged 1,000 parse (first run excluded)
  • Measured
process_creation_time = time.monotonic() - process_start_time

immediately after self._start_new_processes()

Results

“Before parsing loop” vs “before fork”

  • Moving the pre-import outside the parsing loop reduced process_creation_time by 92.8% compared to doing it before per-fork.

Baseline vs “before parsing loop”

  • Without pre-import at all was about 29% faster than pre-importing before the loop, but the actual time saved was tiny—probably just normal fork timing noise.

image

Conclusion

If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency.

Assuming my experimental setup is correct, should we consider whether to run pre-import prior to the parsing loop?

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +116 to +122
for module in iter_airflow_imports(file_path):
try:
importlib.import_module(module)
except ModuleNotFoundError as e:
log.warning("Error when trying to pre-import module '%s' found in %s: %s", module, file_path, e)
Copy link
Contributor

@ephraimbuddy ephraimbuddy Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this.
I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.

Can we verify this?

@@ -78,6 +79,11 @@ def inprocess_client():
return client


@pytest.fixture
def mock_logger():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixture doesn't add anything over using mock directly

@@ -158,6 +169,7 @@ def dag_in_a_fn():
path=path,
bundle_path=tmp_path,
callbacks=[],
logger=mock_logger,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change? I thought there was already a logger attribute on the class?

@Lzzz666 Lzzz666 requested a review from ashb June 27, 2025 02:02
@Lzzz666
Copy link
Contributor Author

Lzzz666 commented Jun 27, 2025

I tried to make an import error, then repaired the module, and the dag was retried and loaded successfully.

2025-06-27.7.48.40.mov

@Lzzz666 Lzzz666 requested a review from eladkal June 27, 2025 11:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:DAG-processing backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Respect parsing_pre_import_modules in Airflow 3
8 participants