Skip to content

Optimize deferrable mode execution for DataprocSubmitJobOperator #31317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 4, 2023

Conversation

phanikumv
Copy link
Contributor

This PR verifies if a dataproc job has already completed (or) errored out (or) cancelled before deferring the task to prevent unnecessary deferring. This way, we can skip deferring the task if it has already been finished.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels May 16, 2023
@phanikumv phanikumv requested a review from josh-fell May 16, 2023 12:46
Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what is the probability to have a quick end of the job like that regardless the reason, we are talking about a few microseconds after the submission 🤔

Is there any issue/use case which can be fixed/covered by this PR?

Comment on lines 2033 to 2041
job = self.hook.get_job(project_id=self.project_id, region=self.region, job_id=self.job_id)
state = job.status.state
if state == JobStatus.State.DONE:
return self.job_id
elif state == JobStatus.State.ERROR:
raise AirflowException(f"Job failed:\n{job}")
elif state == JobStatus.State.CANCELLED:
raise AirflowException(f"Job was cancelled:\n{job}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this only be in the defer path? Otherwise it would seem it is duplicative.

And then might be nice to shove this code into a private method too.

e.g.

        if self.deferrable:
            job_id = self._check_before_defer(...)
            if job_id:
                return job_id
            self.defer(
                trigger=DataprocSubmitTrigger(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the code under self.deferrable condition

@potiuk potiuk merged commit 495ae23 into apache:main Jun 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants