-
Notifications
You must be signed in to change notification settings - Fork 15.3k
fix(provider): progress_callback was repeatedly called with the first log line #52164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix(provider): progress_callback was repeatedly called with the first log line #52164
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
@@ -524,10 +524,10 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None | |||
message_timestamp = line_timestamp | |||
progress_callback_lines.append(line) | |||
else: # previous log line is complete | |||
for line in progress_callback_lines: | |||
for progress_line in progress_callback_lines: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That does not seem like an eror. It's just a variable name. Are you sure that if you come back to the original code your test is failing ? It should not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a variable name
That is correct. And I intend for this PR to do nothing but to change this variable name (and maybe add the unit test).
The reason this variable name needs to be changed and why I'm creating this MR is that in line 519, the variable line
is assigned to raw_line.decode("utf-8", errors="backslashreplace")
and in line 540 this variable is used: progress_callback_lines = [line]
. But in Python, for
-loops are not scoped. This means that the variable line
is overwritten with the last value of the loop iterator. Leading to, in my opinion, undesired behaviour for using the progress_callback
.
To illustrate this behaviour, I have created this Python fiddle: Link
Or you can execute this in a Python interpreter of your choice:
line = "Outer line"
for line in ["Log 1", "Log 2", "Log 3"]:
x = line
print(f"Line after Loop: {line}")
def test():
f_line = "Outer Line Function"
for f_line in ["FLog 1", "FLog 2", "FLog 3"]:
y = f_line
return f_line
ret = test()
print(f"Result of Function line: {ret}")
And lastly, if I revert commit e1d67dd and execute the unit test added for this PR, this is the result:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not look like the change changes anything beside variable name
First of all, I want to thank you for taking your time and looking into my PR. I appreciate that. I think, I made the reason for this PR clear in my comment. But I want to add a bit of context. We're using the SparkKubernetesOperator and I wanted to use the
I added a
using the logging functionality of Airflow. Normally, I would expect all log lines to be duplicated, yet this is what happened:
My callback was repeatedly called with the first line ever logged. I am aware that our deployment is a few versions behind, but this is why I created the unit test. Best regards |
I realized that there was a bug in the way that the PodManager calls the
progress_callback
with its log lines. Due to the log line variable being calledline
and the loop variable that loops over theprogress_callback_lines
also being calledline
led, in the case that all log lines have a timestamp, to theprogress_callback
always being called with the first log line received.I added a test case and verified that it fails with the code as it was on the main branch. I also applied a minimal fix that fixes the problem and the test case.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.