Skip to content

Use base AWS classes in Glue Trigger / Sensor and implement custom waiter #52243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 26, 2025

Conversation

dominikhei
Copy link
Contributor


Part of #35278

I have implemented a custom waiter and ported the sensor and trigger to the base AWS classes. For the naming of parameters and what to include in the docstrings, I looked at the already existing Glue ones.

On that note: I noticed that all operators use the max_attempts and poll_interval parameters (although often named differently), that are later passed to the trigger. Wouldn't it make sense to move them to the AWSBaseOperator too?

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Jun 25, 2025
@dominikhei dominikhei marked this pull request as ready for review June 25, 2025 12:34
@eladkal eladkal requested a review from vincbeck June 25, 2025 13:50
@vincbeck vincbeck merged commit f641ef3 into apache:main Jun 26, 2025
138 checks passed
@@ -254,7 +265,7 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None

if validated_event["status"] != "success":
raise AirflowException(f"Error in glue job: {validated_event}")
return validated_event["value"]
return validated_event["run_id"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we make this change? What are the implications of changing the return value? Is this not a breaking change?

Copy link
Contributor Author

@dominikhei dominikhei Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the key to make it more explicit, see here. That's similar as it is in other services, e.g with the GlueDataQualityRuleRecommendationRunOperator.

I'm happy to revert it to value if preferred, but as long as the change is consistently applied where it's used, the renaming shouldn't introduce any issues?

@@ -231,7 +241,8 @@ def execute(self, context: Context):
run_id=self._job_run_id,
verbose=self.verbose,
aws_conn_id=self.aws_conn_id,
job_poll_interval=self.job_poll_interval,
waiter_delay=int(self.job_poll_interval),
waiter_max_attempts=self.retry_limit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.retry_limit is zero by default which means we don't ever attempt again and we'll fail immediately. This is causing our system tests to fail when run in deferrable mode:

Screenshot from 2025-06-26 15-40-16

Comment on lines +55 to +56
:param poke_interval: Polling period in seconds to check for the status of the job. (default: 120)
:param max_retries: Number of times before returning the current state. (default: 60)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we'll wait for 2 hours. Is that a sane default?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaults you added in the Trigger are 60 - 75, any reason to not match that here?

Copy link
Contributor Author

@dominikhei dominikhei Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaults you added in the Trigger are 60 - 75, any reason to not match that here?

I followed the pattern used in other Glue sensors to stay consistent, as I was a bit uncertain what to use myself, but as you said probably better to stay with what's there. I can adjust that.

o-nikolas added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jun 26, 2025
In apache#52243 the waiting was moved from custom code within the glue hook to
using the aws base waiters when deferring Glue jobs. The Trigger was
given inappropriate inputs which caused it to wait for zero attempts,
which causes our tests to fail. This change moves to using the common
parameters we use for other operators in deferrable with the same
defaults as the Trigger has.
Note: previously this Operator used to wait indefinitely for the job to
either complete or fail. The default now waits for 75 minutes. The aws
base waiter has no ability to wait indefinitely, nor do I think it
should, that feels like a bug to me. So I'm considering this slight
behaviour change a bug fix of a bug fix.
eladkal pushed a commit that referenced this pull request Jun 27, 2025
In #52243 the waiting was moved from custom code within the glue hook to
using the aws base waiters when deferring Glue jobs. The Trigger was
given inappropriate inputs which caused it to wait for zero attempts,
which causes our tests to fail. This change moves to using the common
parameters we use for other operators in deferrable with the same
defaults as the Trigger has.
Note: previously this Operator used to wait indefinitely for the job to
either complete or fail. The default now waits for 75 minutes. The aws
base waiter has no ability to wait indefinitely, nor do I think it
should, that feels like a bug to me. So I'm considering this slight
behaviour change a bug fix of a bug fix.
kyungjunleeme pushed a commit to kyungjunleeme/airflow that referenced this pull request Jun 28, 2025
In apache#52243 the waiting was moved from custom code within the glue hook to
using the aws base waiters when deferring Glue jobs. The Trigger was
given inappropriate inputs which caused it to wait for zero attempts,
which causes our tests to fail. This change moves to using the common
parameters we use for other operators in deferrable with the same
defaults as the Trigger has.
Note: previously this Operator used to wait indefinitely for the job to
either complete or fail. The default now waits for 75 minutes. The aws
base waiter has no ability to wait indefinitely, nor do I think it
should, that feels like a bug to me. So I'm considering this slight
behaviour change a bug fix of a bug fix.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants