-
Notifications
You must be signed in to change notification settings - Fork 15.2k
Provider Migration: Update beam for Airflow 3.0 compatibility #52607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
mock_ti.xcom_push.assert_called_once_with(key="dataflow_job_id", value=sample_df_job_id) | ||
mock_ti.xcom_push.reset_mock() | ||
op.dataflow_job_id = "sample_df_job_same_value_id" | ||
mock_ti.xcom_push.assert_not_called() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we rewrite this using a more pytest-native style instead of unittest.mock assertions like assert_not_called or assert_called_once_with?
It would help improve consistency and readability across the test suite.
mock_ti = MagicMock() | ||
op._execute_context = {"ti": mock_ti} | ||
|
||
assert op.dataflow_job_id is None | ||
|
||
op.dataflow_job_id = sample_df_job_id | ||
mock_ti.xcom_push.assert_called_once_with(key="dataflow_job_id", value=sample_df_job_id) | ||
mock_ti.xcom_push.reset_mock() | ||
op.dataflow_job_id = "sample_df_job_same_value_id" | ||
mock_ti.xcom_push.assert_not_called() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mock_ti = MagicMock() | |
op._execute_context = {"ti": mock_ti} | |
assert op.dataflow_job_id is None | |
op.dataflow_job_id = sample_df_job_id | |
mock_ti.xcom_push.assert_called_once_with(key="dataflow_job_id", value=sample_df_job_id) | |
mock_ti.xcom_push.reset_mock() | |
op.dataflow_job_id = "sample_df_job_same_value_id" | |
mock_ti.xcom_push.assert_not_called() | |
# Mock TaskInstance and inject into execution context | |
mock_ti = MagicMock() | |
op._execute_context = {"ti": mock_ti} | |
assert op.dataflow_job_id is None | |
op.dataflow_job_id = sample_df_job_id | |
assert any( | |
kwargs.get("key") == "dataflow_job_id" and kwargs.get("value") == sample_df_job_id | |
for _, kwargs in mock_ti.xcom_push.call_args_list | |
) | |
# If the same value is set again, it should not push to XCom again | |
mock_ti.reset_mock() | |
op.dataflow_job_id = sample_df_job_id | |
assert mock_ti.xcom_push.call_count == 0 |
I have tested this code.
__all__ = [ | ||
"AIRFLOW_V_3_1_PLUS", | ||
"BaseOperator", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about could add new line at end of file?
Follow-up of #52292. Part of #52378
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.