-
Notifications
You must be signed in to change notification settings - Fork 15.3k
feat(task_sdk): add support for inlet_events in Task Context #45960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3cb8fb4
to
6ac30c8
Compare
0c8da84
to
9460573
Compare
a267114
to
f613905
Compare
Hey @ashb and @amoghrajesh , could you please take a quick look at this PR? in case I'm doing something wrong regarding task_sdk. The tests are not fixed as of now. I'll work on that if the overall logic looks ok. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General direction looking good, will take a better look with tests.
Thanks @amoghrajesh ! |
b5fe1d2
to
d07c722
Compare
looks like |
it's green after rerun 🙌 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there. Preemptively approving provided my comments are handled.
cd49204
to
1cf88c4
Compare
@amoghrajesh I'll merge it this afternoon. Please let me know if you want to take a look again 🙂 |
…45960) * feat(task_sdk): add support for inlet_events in Task Context * feat(task_sdk): add AssetEventCollectionResponse * refactor(task_sdk): combine asset event uris * refactor(api_fastapi): extract asset_event datamodels from asset * fix(task_sdk): revert unrelated datamodels change * fix(task_sdk/context): add _get_asset_events_from_db for fixing tests * test(task_sdk): add test cases for execution_time context inlet access * test(task_sdk): extend test_handle_requests to include asset event calls * test(execution_api): add tests to asset event apis * fix(execution_api): remove unnecessary redact * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): add missing http exception * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): remove duplicate inlet logic * feat(task_sdk): remove AssetEvent form definitions * test(task_sdk): add test case test_run_with_asset_inlets * docs(newsfragments): add description of how inlet_events access has been changed
…45960) * feat(task_sdk): add support for inlet_events in Task Context * feat(task_sdk): add AssetEventCollectionResponse * refactor(task_sdk): combine asset event uris * refactor(api_fastapi): extract asset_event datamodels from asset * fix(task_sdk): revert unrelated datamodels change * fix(task_sdk/context): add _get_asset_events_from_db for fixing tests * test(task_sdk): add test cases for execution_time context inlet access * test(task_sdk): extend test_handle_requests to include asset event calls * test(execution_api): add tests to asset event apis * fix(execution_api): remove unnecessary redact * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): add missing http exception * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): remove duplicate inlet logic * feat(task_sdk): remove AssetEvent form definitions * test(task_sdk): add test case test_run_with_asset_inlets * docs(newsfragments): add description of how inlet_events access has been changed
Why
We need to add support for inlet-events in task_sdk context as well.
What
InletEventsAccessors
fromairflow/utils/context.py
totask_sdk/src/airflow/sdk/execution_time/context.py
SUPERVISOR_COMMS
to retrieve asset eventsAssetEventResponse
,DagRunAssetReference
data models/asset-events
route with/by-asset-name-uri
/by-asset-uri
/by-asset-name
/by-alias-name
AssetEventOperations
to handle the routingtask_sdk/src/airflow/sdk/execution_time/supervisor.py
closes: #45717
closes: #46852
closes: #46852
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.