Skip to content

Add MwaaTaskSensor to Amazon Provider Package #51719

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

seanghaeli
Copy link
Contributor

The MwaaTaskSensor waits for the completion of a DAG task instance in an MWAA environment. This PR includes an implementation with unit tests, system tests, and docs. Similar to MwaaDagRunSensor

Also modified system test to have MwaaTriggerDagRunOperator set to deferrable=True. This tests the MwaaTaskSensor and MwaaDagRunSensor sensors during execution of DAG Run rather than only afterwards.

failure_states: Collection[str] | None = None,
waiter_delay: int = 60,
waiter_max_attempts: int = 720,
aws_conn_id: str | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't consistent with what was discussed on #51196 is it?

Comment on lines +141 to +147
self.success_states = set(success_states) if success_states else {TaskInstanceState.SUCCESS.value}
self.failure_states = set(failure_states) if failure_states else {TaskInstanceState.FAILED.value}

if len(self.success_states & self.failure_states):
raise ValueError("success_states and failure_states must not have any values in common")

in_progress_states = {s.value for s in TaskInstanceState} - self.success_states - self.failure_states
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this logic right? If we fall back to the defaults it will consider skipped and removed as in progress state.
Also, there is no protection here against possible future addition of new state to task instance. For example we are discussing #12199

I suggest to add defensive test around adding more states so we'll know to modify code here or maybe we can consider adding more classes to categorized states similar to

class TerminalTIState(str, Enum):
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
REMOVED = "removed"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also upstream_failed for example and others that are terminal that will cause this to wait forever right?

@ramitkataria
Copy link
Contributor

This tests the MwaaTaskSensor and MwaaDagRunSensor sensors during execution of DAG Run rather than only afterwards

Wouldn't it still wait for the dag run to complete? I think in this case the waiting would always be in deferrable mode instead of using the config value for operators.default_deferrable which would probably be the preferred method so that we can test both cases by just changing the config value, without having to modify the code

If we want to test the sensor during execution, we could run the dag again in another task before the sensor task but I'm not sure if we want to be that exhaustive in system tests

@seanghaeli
Copy link
Contributor Author

seanghaeli commented Jun 18, 2025

Wouldn't it still wait for the dag run to complete?

@ramitkataria With MwaaTriggerDagRunOperator's deferrable=True, wouldn't it proceed to the task sensor without waiting for the dag run to be done?

@ramitkataria
Copy link
Contributor

Wouldn't it still wait for the dag run to complete?

@ramitkataria With MwaaTriggerDagRunOperator's deferrable=True, wouldn't it proceed to the task sensor without waiting for the dag run to be done?

Also discussed offline but in short, the sensor task would still wait for this task because the sensor task is set to depend on this task since they're in a chain

… adjust the default value of in base class to .

- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.
@seanghaeli
Copy link
Contributor Author

I see that the commit message is rendering weird above so I'll rewrite it here for clarity:

  • Comply with PR Rds Operator pass custom conn_id to superclass #51196: explicitly pass aws_conn_id to its superclass, and adjust the default value of aws_conn_id in base class to aws_default.
  • Add defensive test around adding more task instance states to keep in_progress_states of the MwaaTaskCompletedTrigger up to date.
  • Fix issue where waiter_delay of the MwaaTaskSensor derives to float instead of int type.
  • Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
  • Make external_dag_run_id an optional parameter, where it defaults to the latest dag run.
  • Externally fetch the task ID variable.
  • Test the sensor while a DAG Run is still in progress.

@@ -132,7 +132,7 @@ def poke(self, context: Context) -> bool:

if state in self.failure_states:
raise AirflowException(
f"The DAG run {self.external_dag_run_id} of DAG {self.external_dag_id} in MWAA environment {self.external_env_name} "
f"The DAG run {self.external_dag_run_id} of DAG {self.external_dag_id} in MWAA environment {self.external_env_name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you delete the space here? The env name and the word failed no longer have a space between them now? Or does the env name have a space included at the end of it already?

For more information on how to use this sensor, take a look at the guide:
:ref:`howto/sensor:MwaaTaskSensor`

:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for
:param external_env_name: The external MWAA environment name that contains the Task Instance you want to wait for

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below as well. This Operator just waits for a single task, not the whole Dag Run. I'm assuming this is just copy/paste from the above operator.

if state in self.failure_states:
raise AirflowException(
f"The task {self.external_task_id} of DAG run {self.external_dag_run_id} of DAG {self.external_dag_id} in MWAA environment {self.external_env_name}"
f"failed with state: {state}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment above, no space?

@@ -80,7 +80,7 @@ def __init__(
waiter_delay: int,
waiter_max_attempts: int,
waiter_config_overrides: dict[str, Any] | None = None,
aws_conn_id: str | None,
aws_conn_id: str | None = "aws_default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is probably a good change, but this is the base trigger and will affect all AWS triggers. So I'm curious what caused you to modify this one?

"""
Trigger when an MWAA Task is complete.

:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for
:param external_env_name: The external MWAA environment name that contains the Task Instance you want to wait for

Same as the Operator class, these param descriptions need slight updates for task waiting not dag run waiting.

Comment on lines +141 to +147
self.success_states = set(success_states) if success_states else {TaskInstanceState.SUCCESS.value}
self.failure_states = set(failure_states) if failure_states else {TaskInstanceState.FAILED.value}

if len(self.success_states & self.failure_states):
raise ValueError("success_states and failure_states must not have any values in common")

in_progress_states = {s.value for s in TaskInstanceState} - self.success_states - self.failure_states
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also upstream_failed for example and others that are terminal that will cause this to wait forever right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants