-
Notifications
You must be signed in to change notification settings - Fork 15.3k
AIP-88: Lazy expandable task mapping #51391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
dabla
wants to merge
47
commits into
apache:main
Choose a base branch
from
dabla:feature/aip-88-lazy-expandable-tasks
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…w ExpandInput's and scheduler XCom's to resolve values on server side (like it was in the past)
…updated within the expand_tasks method instead of ExpandInput
…r others in DeferredIterable
…ESSION in TaskExpansionJobRunner
…type checking block
…ethod to try avoiding circular import TaskInstance
…to avoid CommitProhibitorGuard as it has to run in a separate process outside the scheduler
…able type anymore (e.g. iteable), as we don't need to calculate length in advance anymore
… used in generic way in task expansion job runner
…nal implementation as we probably won't need it anymore once AIP-88 is fully implemented
…operator property on SchedulerMapXCom so operators can be accessed in generic way identically of which type it is
… triggerer instead of the scheduler as the scheduler cannot resolve XCom's
…s aren't executed by the scheduler anymore and also added feature bit to allow enabling AIP-88
…o be done by the executor when running the expanded task
…sionJobRunner so the scheduler can pick those up while expansion is still running
… so we don't need to fetch those again when we re-iterate over it
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR introduces a first implementation of AIP-88, which proposes support for lazy-expandable task mapping in Airflow.
Currently, task mapping requires knowing the length of an XCom at expansion time, which assumes that the XCom is a fully materialized list, set, or dict. This limitation makes it impossible to support scenarios where the XCom is a generator or other lazy iterable (e.g. streaming).
This PR changes the behavior so that instead of eagerly determining the number of mapped task instances based on the length of the XCom, the trigger will unmap the expandable task instances dynamically as it iterates over the XCom. This allows XComs to be lazy iterables, such as generators or streamable paginated responses. For that I've created a new class called TaskExpansionJobRunner which will be run in the triggerer, as there we have XCom access which we don't have in the scheduler. Also the DeferredIterable will execute a trigger when needed to fetch additional paging, as the iterable is lazy, it will only fetch new data when it's exhausted, thus it has also to be run in the triggerer.
With this change, we introduce the concept of deferred XCom iterables, which enables Airflow to fetch additional pages or items only when needed during expansion. This avoids the need to prefetch all data upfront, improving both memory efficiency and scheduling performance, and aligns with a producer-consumer pattern.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.