Skip to content

AIP-88: Lazy expandable task mapping #51391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 47 commits into
base: main
Choose a base branch
from

Conversation

dabla
Copy link
Contributor

@dabla dabla commented Jun 4, 2025

This PR introduces a first implementation of AIP-88, which proposes support for lazy-expandable task mapping in Airflow.

Currently, task mapping requires knowing the length of an XCom at expansion time, which assumes that the XCom is a fully materialized list, set, or dict. This limitation makes it impossible to support scenarios where the XCom is a generator or other lazy iterable (e.g. streaming).

This PR changes the behavior so that instead of eagerly determining the number of mapped task instances based on the length of the XCom, the trigger will unmap the expandable task instances dynamically as it iterates over the XCom. This allows XComs to be lazy iterables, such as generators or streamable paginated responses. For that I've created a new class called TaskExpansionJobRunner which will be run in the triggerer, as there we have XCom access which we don't have in the scheduler. Also the DeferredIterable will execute a trigger when needed to fetch additional paging, as the iterable is lazy, it will only fetch new data when it's exhausted, thus it has also to be run in the triggerer.

With this change, we introduce the concept of deferred XCom iterables, which enables Airflow to fetch additional pages or items only when needed during expansion. This avoids the need to prefetch all data upfront, improving both memory efficiency and scheduling performance, and aligns with a producer-consumer pattern.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

…w ExpandInput's and scheduler XCom's to resolve values on server side (like it was in the past)
@dabla dabla requested review from uranusjr, XD-DENG and ashb as code owners June 4, 2025 07:27
@dabla dabla marked this pull request as draft June 5, 2025 12:36
dabla and others added 15 commits June 6, 2025 09:51
…updated within the expand_tasks method instead of ExpandInput
…ethod to try avoiding circular import TaskInstance
…to avoid CommitProhibitorGuard as it has to run in a separate process outside the scheduler
davidblain-infrabel and others added 22 commits June 19, 2025 09:43
…able type anymore (e.g. iteable), as we don't need to calculate length in advance anymore
… used in generic way in task expansion job runner
…nal implementation as we probably won't need it anymore once AIP-88 is fully implemented
…operator property on SchedulerMapXCom so operators can be accessed in generic way identically of which type it is
… triggerer instead of the scheduler as the scheduler cannot resolve XCom's
…s aren't executed by the scheduler anymore and also added feature bit to allow enabling AIP-88
…o be done by the executor when running the expanded task
…sionJobRunner so the scheduler can pick those up while expansion is still running
… so we don't need to fetch those again when we re-iterate over it
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants