-
Notifications
You must be signed in to change notification settings - Fork 15.3k
AIP-81 Make list operations to return all results #50132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
AIP-81 Make list operations to return all results #50132
Conversation
42d0710
to
282fbe1
Compare
@bugraoz93 whenever you have some free time, could you review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @Prab-27 ! We don't need this change with .all()
. Paginated Select method already handles that for us, we need to use those parameters and handle this in airflowctl
operations.
Let's take the first change as an example,
airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
Lines 116 to 136 in 3b7f392
@assets_router.get( | |
"/assets", | |
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), | |
dependencies=[ | |
Depends(requires_access_asset(method="GET")), | |
Depends(requires_access_asset_alias(method="GET")), | |
], | |
) | |
def get_assets( | |
limit: QueryLimit, | |
offset: QueryOffset, | |
name_pattern: QueryAssetNamePatternSearch, | |
uri_pattern: QueryUriPatternSearch, | |
dag_ids: QueryAssetDagIdPatternSearch, | |
only_active: Annotated[OnlyActiveFilter, Depends(OnlyActiveFilter.depends)], | |
order_by: Annotated[ | |
SortParam, | |
Depends(SortParam(["id", "name", "uri", "created_at", "updated_at"], AssetModel).dynamic_depends()), | |
], | |
session: SessionDep, | |
) -> AssetCollectionResponse: |
That is already covered by this method. See below, this is doing a paginated select using those parameters. We need to use
offset
and limit
parameters for paginated select in airflowctl
operations.airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
Lines 170 to 177 in 3b7f392
assets_select, total_entries = paginated_select( | |
statement=assets_select_statement, | |
filters=[only_active, name_pattern, uri_pattern, dag_ids], | |
order_by=order_by, | |
offset=offset, | |
limit=limit, | |
session=session, | |
) |
282fbe1
to
73fde34
Compare
73fde34
to
369cbd0
Compare
cc: @bugraoz93 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Prab-27, thanks for the changes!
As we discussed in Slack in the below message and in my previous comment. We shouldn't update the API and return everything. We should already leverage the paginated selects from the endpoint and tweak the offset and limit with a loop. This will make the system more resilient
Please also see the Slack thread where this again brought up, and we have not gone with direction in any solution.
https://apache-airflow.slack.com/archives/C07813CNKA8/p1747494203440959?thread_ts=1747494203.440959&cid=C07813CNKA8
Yeah, I’m sorry. I will update it based on the discussion. Somehow, I had a misconception |
All good. Thanks for your contributions! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't update the API and return everything. We should already leverage the paginated selects from the endpoint and tweak the offset and limit with a loop. This will make the system more resilient
+1 for that. Also I don't know what the client code looks like, but you can leverage httpx or any other async http lib to be able to fetch multiple pages "at a time". For instance 10 pages, each page with a limit of 100. That's 1000 items for each loop basically.
Got it !! Only a few changes are left , I'll update them tomorrow as soon as possible |
369cbd0
to
e958d7e
Compare
@bugraoz93 is this okay ? If so, I'll fix the static checks and add tests for it |
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
Outdated
Show resolved
Hide resolved
e958d7e
to
c57eab2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a couple of tests for return_all_entries
to be sure that the logic is correct, fetching all the pages as expected depending on different total_entries
offset
and limit
value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most parts already covered by Pierre, thanks a lot! I have a single additional comment to optimise the process for all list operations
"""List all pools.""" | ||
try: | ||
self.response = self.client.get("pools") | ||
return PoolCollectionResponse.model_validate_json(self.response.content) | ||
total_entries = PoolCollectionResponse.model_validate_json(self.response.content).total_entries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing an additional call here. It would be great to include the first set of results, 50 as default, in the results. In return_all_entries
, we can start from offset 50 and merge the result. Even just extending what was returned from the response, because the first 50 is already returned as a list. This can reduce at least one call per list operation. Because this is not only returning total entries, but the first 50 records from the database. We can even pass the first batch to the method
c57eab2
to
0d397ac
Compare
Some tests need updating . I’ll do my best to make the changes by tomorrow. |
…n.py and restore endpoints - pools.py, variables.py, job.py
0d397ac
to
a1a034e
Compare
if params is None: | ||
params = {} | ||
params.update({"limit": limit}, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not consequential, but updating params
in-place is somewhat surprising. I’d do something like
shared_params = {"limit": limit, **(params or {}), **kwargs}
while ...:
loop_params = {**shared_params, "offset": offset}
Also, the kwargs feature seems somewhat unnecessary. I would just get rid of it and require everything passed inside params.
except ServerResponseError as e: | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is useless. Do you plan to handle the exception in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to handle the exceptions closer to the cli layer to prevent custom logging on the API end. So we are generally throwing here, and whoever calls in ctl/cli catch it with proper errors. Do you think we should handle them here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I need to remove the try-except block from the return_all_list
method, since all list methods like pools list
- already covered this exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move it to a generic method and not add them into individual methods and abstracting entire calls into that method should easily eliminate duplicate calls of the same error. Both works for now, you can also do it elsewhere. We need to refactor a bit how we throw the exceptions in the future, where there are lots of duplicate calls that can be generalised
closes: #49019
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.