-
Notifications
You must be signed in to change notification settings - Fork 15.2k
Add endpoint to watch dag run until finish #51920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
FYI. Not sure if it helps but OpenAPI 3.2.0 will have support for |
9079d18
to
231597b
Compare
231597b
to
007a051
Compare
I added some tests for the endpoint, but couldn’t figure out how to test the looping part. Hopefully this is good enough… |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, just a few suggestions / nits.
Indeed an additional test case for running
/ success
state would be great.
@@ -163,3 +163,10 @@ class DAGRunsBatchBody(StrictBaseModel): | |||
start_date_lte: AwareDatetime | None = None | |||
end_date_gte: AwareDatetime | None = None | |||
end_date_lte: AwareDatetime | None = None | |||
|
|||
|
|||
class DAGRunWatchResult(StrictBaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need a new Serializer? I'm not fan of having each endpoint returning custom schema, unless that would really impact / slow down performances. Here can't we just use the default DAGRunResponse
, which holds the state and duration. This will be much easier for client to understand the API if we avoid multiplication of partial schemas.
async with create_session_async() as session: | ||
dag_run = await session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=run_id)) | ||
yield DAGRunWatchResult.model_validate(dag_run, from_attributes=True).model_dump_json() | ||
yield "\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicated from above, maybe should be extracted into a small fn.
Also we tend to not put services/utility code in the routes (endpoint) files directly. Those can live in services/public/dag_runs.py
@pytest.fixture(autouse=True) | ||
def reconfigure_async_db_engine(self): | ||
from airflow.settings import _configure_async_session | ||
|
||
_configure_async_session() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that still necessary? Tests are green for me even when removing this fixture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test_should_respond_200_immediately_for_finished_run
tests fail for me without this (there are two fixtures; the first would pass but the second would fail)
@dag_run_router.get( | ||
"/{dag_run_id}/watch", | ||
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), | ||
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN))], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding visible OpenAPI doc? Somehow like we do with the TaskInatance Log route.
airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
Lines 63 to 70 in c3f9da4
responses={ | |
**create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), | |
status.HTTP_200_OK: { | |
"description": "Successful Response", | |
"content": ndjson_example_response_for_get_log, | |
}, | |
}, | |
dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.TASK_LOGS))], |
Close #51711.
I initially wanted to just enhance the trigger endpoint to optionally stream until the run finishes, but it seems that FastAPI does not like this optionally stream idea. You can do it of course, but would loose a lot of the auto annotation reflection feature. So I opted to have a separate streaming endpoint instead.
This endpoint repeatedly emits a JSON object at the specified interval, until the dag reaches a finished state.
Tests to come.