Skip to content

Add endpoint to watch dag run until finish #51920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

uranusjr
Copy link
Member

@uranusjr uranusjr commented Jun 19, 2025

Close #51711.

I initially wanted to just enhance the trigger endpoint to optionally stream until the run finishes, but it seems that FastAPI does not like this optionally stream idea. You can do it of course, but would loose a lot of the auto annotation reflection feature. So I opted to have a separate streaming endpoint instead.

This endpoint repeatedly emits a JSON object at the specified interval, until the dag reaches a finished state.

Tests to come.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers. labels Jun 19, 2025
@potiuk
Copy link
Member

potiuk commented Jun 19, 2025

FYI. Not sure if it helps but OpenAPI 3.2.0 will have support for application/json-seq OAI/OpenAPI-Specification#3730 (planned for August)

@uranusjr uranusjr added this to the Airflow 3.1.0 milestone Jun 24, 2025
@uranusjr uranusjr force-pushed the stream-until-run-complete branch from 9079d18 to 231597b Compare June 25, 2025 04:11
@uranusjr uranusjr force-pushed the stream-until-run-complete branch from 231597b to 007a051 Compare June 25, 2025 04:53
@uranusjr
Copy link
Member Author

I added some tests for the endpoint, but couldn’t figure out how to test the looping part. Hopefully this is good enough…

Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, just a few suggestions / nits.

Indeed an additional test case for running / success state would be great.

@@ -163,3 +163,10 @@ class DAGRunsBatchBody(StrictBaseModel):
start_date_lte: AwareDatetime | None = None
end_date_gte: AwareDatetime | None = None
end_date_lte: AwareDatetime | None = None


class DAGRunWatchResult(StrictBaseModel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a new Serializer? I'm not fan of having each endpoint returning custom schema, unless that would really impact / slow down performances. Here can't we just use the default DAGRunResponse, which holds the state and duration. This will be much easier for client to understand the API if we avoid multiplication of partial schemas.

Comment on lines +447 to +450
async with create_session_async() as session:
dag_run = await session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=run_id))
yield DAGRunWatchResult.model_validate(dag_run, from_attributes=True).model_dump_json()
yield "\n"
Copy link
Member

@pierrejeambrun pierrejeambrun Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated from above, maybe should be extracted into a small fn.

Also we tend to not put services/utility code in the routes (endpoint) files directly. Those can live in services/public/dag_runs.py

Comment on lines +1614 to +1618
@pytest.fixture(autouse=True)
def reconfigure_async_db_engine(self):
from airflow.settings import _configure_async_session

_configure_async_session()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that still necessary? Tests are green for me even when removing this fixture.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test_should_respond_200_immediately_for_finished_run tests fail for me without this (there are two fixtures; the first would pass but the second would fail)

Comment on lines +453 to +457
@dag_run_router.get(
"/{dag_run_id}/watch",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN))],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding visible OpenAPI doc? Somehow like we do with the TaskInatance Log route.

responses={
**create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
status.HTTP_200_OK: {
"description": "Successful Response",
"content": ndjson_example_response_for_get_log,
},
},
dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.TASK_LOGS))],

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable Inference Execution / Synchronous DAG Execution
4 participants