Skip to content

Enhance Variable set method to use upsert instead of delsert #48547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 26, 2025

Conversation

ksh24865
Copy link
Contributor

Refactor Variable.set Method to Use Upsert Instead of Delsert

This pull request refactors the Variable.set method to replace the current delsert pattern (delete followed by insert) with an upsert approach, addressing key reliability and efficiency issues.

Key Changes:

1. Preventing Unique Constraint Errors

When set is called repeatedly within a single process, the existing delsert pattern can trigger UNIQUE constraint violations. This occurs when a DELETE operation has not been flushed or committed before a subsequent INSERT is attempted — even within the same session — leading to conflicts.

Example error message:

 sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "variable_key_key"
DETAIL:  Key (key)=(dummy_variable_key_for_test) already exists.
[SQL: INSERT INTO variable (key, val, description, is_encrypted) VALUES (%(key)s, %(val)s, %(description)s, %(is_encrypted)s) RETURNING variable.id]
[parameters: {'key': 'dummy_variable_key_for_test', 'val': 'dummy_variable_value_for_test', 'description': None, 'is_encrypted': True}]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
image

2. Reducing Unnecessary Transactions

The previous delsert pattern always executed both a DELETE and an INSERT, even when the variable already existed. This introduced unnecessary write operations and transactional overhead.
By switching to an update-or-insert approach, the common case — where a variable already exists — now results in a single UPDATE instead of a full row replacement. This reduces the number of write operations and improves overall efficiency without changing behavior.

Please review the changes and let me know if there are any questions or further improvements needed.
Feel free to adjust the wording to better fit your style, or let me know if you'd like me to make any changes.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@ksh24865 ksh24865 requested review from XD-DENG and ashb as code owners March 30, 2025 08:44
Copy link

boring-cyborg bot commented Mar 30, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch from 4d1b53e to 05569bd Compare March 30, 2025 11:09
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here’s another related PR for Variable.set: #48177.

If we agree on the Upsert behavior, the Execution API should also follow the same approach, right?

In that case, we should either wait for #48177 to be merged ( or I can create a separate issue to ensure the Execution API aligns with the Upsert behavior, then this PR won’t depend on #48177. )

cc @ashb, @amoghrajesh

@ksh24865
Copy link
Contributor Author

ksh24865 commented Mar 31, 2025

@jason810496

Thank you for bringing up the idea that the Execution API should also align with the Upsert behavior. :)
I agree with that, and I’ll update this PR to ensure the Execution API follows the same approach.

When you mentioned the Execution API, were you referring to the following path?
/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
It looks like this path is already using the same Variable.set.
If not, I’d appreciate it if you could point me to the correct path.

@ksh24865 ksh24865 requested a review from jason810496 March 31, 2025 08:38
@jason810496
Copy link
Member

It looks like this path is already using the same Variable.set.

Oh yes, you are right! I misremembered — the Execution API indeed uses Variable.set from the model instead of TaskSDK.
We'll still need to wait for @ashb or @amoghrajesh to confirm whether the Upsert behavior is acceptable.

@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch from afd8ed6 to 5095ea9 Compare April 1, 2025 04:39
@ksh24865
Copy link
Contributor Author

ksh24865 commented Apr 1, 2025

@jason810496

Thank you for the suggestion!

I've reviewed the behavior of Variable.set and confirmed that the existing test cases already cover the functionality for the key, val, and description parameters.

In addition, I've added a new test case, test_variable_set_update_existing, to explicitly verify the transition from the delsert to the upsert approach.

@ashb
Copy link
Member

ashb commented Apr 1, 2025

I wonder if it's better to use an actual upsert at the DB level when available https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert (SQLIte has something similar in SQLA too. I don't know if mysql does)

Edit: Yup, it does https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert

@ksh24865
Copy link
Contributor Author

ksh24865 commented Apr 1, 2025

@ashb

Thank you very much for your feedback. :)
I'm already aware of the INSERT ... ON CONFLICT DO UPDATE (or the equivalent) syntax supported by PostgreSQL, MySQL, and SQLite.

However, since SQLAlchemy ORM does not offer a DB-agnostic way to express upserts using these features, I believe the current approach—checking whether the variable exists and then choosing between update or add—is the most universal and reliable solution at the ORM level.

In summary, while your suggestion to leverage a DB-level upsert is certainly valid, given the lack of a DB-agnostic upsert mechanism in SQLAlchemy ORM, I believe the current implementation is the most appropriate choice.

@ksh24865
Copy link
Contributor Author

ksh24865 commented Apr 1, 2025

Additionally, it's technically possible to check the session's dialect first and then branch accordingly to use INSERT ... ON CONFLICT DO UPDATE (or the equivalent) with SQLAlchemy Core, depending on the specific database.

Would it be alright if I write it the way I explained above?

I’d appreciate your thoughts on this. Thank you!
@ashb

@ashb
Copy link
Member

ashb commented Apr 4, 2025

However, since SQLAlchemy ORM does not offer a DB-agnostic way to express upserts using these features, I believe the current approach—checking whether the variable exists and then choosing between update or add—is the most universal and reliable solution at the ORM level.

It is already a well-established pattern in Airflow to have if mysql/if postgres branches. -- espeically as all three supported database have support for ON CONFLICT UPDATE

@ksh24865
Copy link
Contributor Author

ksh24865 commented Apr 5, 2025

@ashb

Thanks for your advice!

As you mentioned, I confirmed that dialect-specific branching pattern is already used in Airflow, and I’ve applied the same pattern to Variable.set.

Tested on PostgreSQL, MySQL, and SQLite.

I'd appreciate it if you could review the changes when you have a moment.

@ksh24865
Copy link
Contributor Author

ksh24865 commented Apr 5, 2025

Thanks for your advice ! @jason810496

As you suggested, the code has been minimized:

  • Applied dialect-specific insert functions and upsert clauses
  • Consolidated the code wherever possible

Also tested on PostgreSQL, MySQL, and SQLite.

I'd appreciate it if you could provide any additional feedback !

cc. @ashb

@ksh24865 ksh24865 requested a review from jason810496 April 6, 2025 06:57
@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch from b06f5f5 to c35a678 Compare April 10, 2025 14:39
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! I’ve left a small suggestion to simplify the if/else block for importing the dialect-specific insert function.

@ksh24865
Copy link
Contributor Author

Thanks, that was useful!
I simplified it using import_string and a mapping.

@jason810496

@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch from ef1eccf to 15a72ce Compare April 11, 2025 08:01
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks @ksh24865, looks good to me!

@ksh24865
Copy link
Contributor Author

Thanks ! @jason810496

I'd appreciate it if you could provide any additional feedback. @ashb

@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch 5 times, most recently from 15107ef to 89aeee6 Compare April 14, 2025 11:08
@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch from 15df12d to bc3656c Compare April 14, 2025 13:25
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small comment, and then we'll merge this after 3.0.0 is out (it's likely fine, but this is not 100% required, and we are being very risk adverse about not merging anything that might risk the release.

@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch 2 times, most recently from 650f175 to d6ebdc7 Compare April 15, 2025 13:22
@ksh24865
Copy link
Contributor Author

Thank you!

I agree with all of your suggestions and have applied them accordingly.

Looking forward to the next steps for this PR after the 3.0.0 release.

@ashb

@ashb ashb added this to the Airflow 3.1+ milestone Apr 22, 2025
@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch 2 times, most recently from 39101d0 to edab4ef Compare April 23, 2025 13:48
Copy link

github-actions bot commented Jun 8, 2025

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added stale Stale PRs per the .github/workflows/stale.yml policy file and removed stale Stale PRs per the .github/workflows/stale.yml policy file labels Jun 8, 2025
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge if CI is green.

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

@ksh24865 the static checks are failing, could you take a look? Best way is to install precommit https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#pre-commit-hooks

@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch from 71fa254 to f77d272 Compare June 25, 2025 15:33
@ksh24865 ksh24865 force-pushed the improvement/variable-set-method branch from f77d272 to 70f9e5d Compare June 25, 2025 15:33
@ksh24865
Copy link
Contributor Author

It seems that the issue occurred due to differences with the main branch since the last update in April. I've resolved it by merging the latest changes from the main branch.
Thank you for the advice.
@amoghrajesh, @uranusjr

@jason810496 jason810496 merged commit 7f694ad into apache:main Jun 26, 2025
3 checks passed
Copy link

boring-cyborg bot commented Jun 26, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants