-
Notifications
You must be signed in to change notification settings - Fork 15.3k
Enhance Variable set method to use upsert instead of delsert #48547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance Variable set method to use upsert instead of delsert #48547
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
4d1b53e
to
05569bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here’s another related PR for Variable.set: #48177.
If we agree on the Upsert behavior, the Execution API should also follow the same approach, right?
In that case, we should either wait for #48177 to be merged ( or I can create a separate issue to ensure the Execution API aligns with the Upsert behavior, then this PR won’t depend on #48177. )
cc @ashb, @amoghrajesh
Thank you for bringing up the idea that the Execution API should also align with the Upsert behavior. :) When you mentioned the Execution API, were you referring to the following path? |
Oh yes, you are right! I misremembered — the Execution API indeed uses |
afd8ed6
to
5095ea9
Compare
Thank you for the suggestion! I've reviewed the behavior of Variable.set and confirmed that the existing test cases already cover the functionality for the key, val, and description parameters. In addition, I've added a new test case, test_variable_set_update_existing, to explicitly verify the transition from the delsert to the upsert approach. |
I wonder if it's better to use an actual upsert at the DB level when available https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert (SQLIte has something similar in SQLA too. I don't know if mysql does) Edit: Yup, it does https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert |
Thank you very much for your feedback. :) However, since SQLAlchemy ORM does not offer a DB-agnostic way to express upserts using these features, I believe the current approach—checking whether the variable exists and then choosing between update or add—is the most universal and reliable solution at the ORM level. In summary, while your suggestion to leverage a DB-level upsert is certainly valid, given the lack of a DB-agnostic upsert mechanism in SQLAlchemy ORM, I believe the current implementation is the most appropriate choice. |
Additionally, it's technically possible to check the session's dialect first and then branch accordingly to use INSERT ... ON CONFLICT DO UPDATE (or the equivalent) with SQLAlchemy Core, depending on the specific database. Would it be alright if I write it the way I explained above? I’d appreciate your thoughts on this. Thank you! |
It is already a well-established pattern in Airflow to have |
Thanks for your advice! As you mentioned, I confirmed that dialect-specific branching pattern is already used in Airflow, and I’ve applied the same pattern to Tested on PostgreSQL, MySQL, and SQLite. I'd appreciate it if you could review the changes when you have a moment. |
Thanks for your advice ! @jason810496 As you suggested, the code has been minimized:
Also tested on PostgreSQL, MySQL, and SQLite. I'd appreciate it if you could provide any additional feedback ! cc. @ashb |
b06f5f5
to
c35a678
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! I’ve left a small suggestion to simplify the if
/else
block for importing the dialect-specific insert
function.
Thanks, that was useful! |
ef1eccf
to
15a72ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks @ksh24865, looks good to me!
Thanks ! @jason810496 I'd appreciate it if you could provide any additional feedback. @ashb |
15107ef
to
89aeee6
Compare
15df12d
to
bc3656c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small comment, and then we'll merge this after 3.0.0 is out (it's likely fine, but this is not 100% required, and we are being very risk adverse about not merging anything that might risk the release.
650f175
to
d6ebdc7
Compare
Thank you! I agree with all of your suggestions and have applied them accordingly. Looking forward to the next steps for this PR after the 3.0.0 release. |
39101d0
to
edab4ef
Compare
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge if CI is green.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
@ksh24865 the static checks are failing, could you take a look? Best way is to install precommit https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#pre-commit-hooks
71fa254
to
f77d272
Compare
f77d272
to
70f9e5d
Compare
It seems that the issue occurred due to differences with the main branch since the last update in April. I've resolved it by merging the latest changes from the main branch. |
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
Refactor Variable.set Method to Use Upsert Instead of Delsert
This pull request refactors the
Variable.set
method to replace the current delsert pattern (delete followed by insert) with an upsert approach, addressing key reliability and efficiency issues.Key Changes:
1. Preventing Unique Constraint Errors
When
set
is called repeatedly within a single process, the existing delsert pattern can triggerUNIQUE constraint
violations. This occurs when aDELETE
operation has not been flushed or committed before a subsequentINSERT
is attempted — even within the same session — leading to conflicts.Example error message:
2. Reducing Unnecessary Transactions
The previous delsert pattern always executed both a DELETE and an INSERT, even when the variable already existed. This introduced unnecessary write operations and transactional overhead.
By switching to an update-or-insert approach, the common case — where a variable already exists — now results in a single UPDATE instead of a full row replacement. This reduces the number of write operations and improves overall efficiency without changing behavior.
Please review the changes and let me know if there are any questions or further improvements needed.
Feel free to adjust the wording to better fit your style, or let me know if you'd like me to make any changes.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.