-
Notifications
You must be signed in to change notification settings - Fork 15.3k
Add MwaaTaskSensor to Amazon Provider Package #51719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
failure_states: Collection[str] | None = None, | ||
waiter_delay: int = 60, | ||
waiter_max_attempts: int = 720, | ||
aws_conn_id: str | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't consistent with what was discussed on #51196 is it?
self.success_states = set(success_states) if success_states else {TaskInstanceState.SUCCESS.value} | ||
self.failure_states = set(failure_states) if failure_states else {TaskInstanceState.FAILED.value} | ||
|
||
if len(self.success_states & self.failure_states): | ||
raise ValueError("success_states and failure_states must not have any values in common") | ||
|
||
in_progress_states = {s.value for s in TaskInstanceState} - self.success_states - self.failure_states |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this logic right? If we fall back to the defaults it will consider skipped
and removed
as in progress state.
Also, there is no protection here against possible future addition of new state to task instance. For example we are discussing #12199
I suggest to add defensive test around adding more states so we'll know to modify code here or maybe we can consider adding more classes to categorized states similar to
airflow/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Lines 415 to 419 in 083e03a
class TerminalTIState(str, Enum): | |
SUCCESS = "success" | |
FAILED = "failed" | |
SKIPPED = "skipped" | |
REMOVED = "removed" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also upstream_failed
for example and others that are terminal that will cause this to wait forever right?
providers/amazon/src/airflow/providers/amazon/aws/sensors/mwaa.py
Outdated
Show resolved
Hide resolved
providers/amazon/src/airflow/providers/amazon/aws/sensors/mwaa.py
Outdated
Show resolved
Hide resolved
providers/amazon/src/airflow/providers/amazon/aws/triggers/mwaa.py
Outdated
Show resolved
Hide resolved
Wouldn't it still wait for the dag run to complete? I think in this case the waiting would always be in deferrable mode instead of using the config value for If we want to test the sensor during execution, we could run the dag again in another task before the sensor task but I'm not sure if we want to be that exhaustive in system tests |
@ramitkataria With |
Also discussed offline but in short, the sensor task would still wait for this task because the sensor task is set to depend on this task since they're in a chain |
… adjust the default value of in base class to . - Add defensive test around adding more task instance states to keep of the MwaaTaskCompletedTrigger up to date. - Fix issue where of the MwaaTaskSensor derives to instead of type. - Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments. - Make an optional parameter, where it defaults to the latest dag run. - Externally fetch the task ID variable. - Test the sensor while a DAG Run is still in progress.
I see that the commit message is rendering weird above so I'll rewrite it here for clarity:
|
@@ -132,7 +132,7 @@ def poke(self, context: Context) -> bool: | |||
|
|||
if state in self.failure_states: | |||
raise AirflowException( | |||
f"The DAG run {self.external_dag_run_id} of DAG {self.external_dag_id} in MWAA environment {self.external_env_name} " | |||
f"The DAG run {self.external_dag_run_id} of DAG {self.external_dag_id} in MWAA environment {self.external_env_name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you delete the space here? The env name and the word failed
no longer have a space between them now? Or does the env name have a space included at the end of it already?
For more information on how to use this sensor, take a look at the guide: | ||
:ref:`howto/sensor:MwaaTaskSensor` | ||
|
||
:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for | |
:param external_env_name: The external MWAA environment name that contains the Task Instance you want to wait for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and below as well. This Operator just waits for a single task, not the whole Dag Run. I'm assuming this is just copy/paste from the above operator.
if state in self.failure_states: | ||
raise AirflowException( | ||
f"The task {self.external_task_id} of DAG run {self.external_dag_run_id} of DAG {self.external_dag_id} in MWAA environment {self.external_env_name}" | ||
f"failed with state: {state}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as comment above, no space?
@@ -80,7 +80,7 @@ def __init__( | |||
waiter_delay: int, | |||
waiter_max_attempts: int, | |||
waiter_config_overrides: dict[str, Any] | None = None, | |||
aws_conn_id: str | None, | |||
aws_conn_id: str | None = "aws_default", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is probably a good change, but this is the base trigger and will affect all AWS triggers. So I'm curious what caused you to modify this one?
""" | ||
Trigger when an MWAA Task is complete. | ||
|
||
:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:param external_env_name: The external MWAA environment name that contains the DAG Run you want to wait for | |
:param external_env_name: The external MWAA environment name that contains the Task Instance you want to wait for |
Same as the Operator class, these param descriptions need slight updates for task waiting not dag run waiting.
self.success_states = set(success_states) if success_states else {TaskInstanceState.SUCCESS.value} | ||
self.failure_states = set(failure_states) if failure_states else {TaskInstanceState.FAILED.value} | ||
|
||
if len(self.success_states & self.failure_states): | ||
raise ValueError("success_states and failure_states must not have any values in common") | ||
|
||
in_progress_states = {s.value for s in TaskInstanceState} - self.success_states - self.failure_states |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also upstream_failed
for example and others that are terminal that will cause this to wait forever right?
The
MwaaTaskSensor
waits for the completion of a DAG task instance in an MWAA environment. This PR includes an implementation with unit tests, system tests, and docs. Similar to MwaaDagRunSensorAlso modified system test to have
MwaaTriggerDagRunOperator
set todeferrable=True
. This tests theMwaaTaskSensor
andMwaaDagRunSensor
sensors during execution of DAG Run rather than only afterwards.