Skip to content

fix(provider): progress_callback was repeatedly called with the first log line #52164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

msimmoteit-neozo
Copy link

I realized that there was a bug in the way that the PodManager calls the progress_callback with its log lines. Due to the log line variable being called line and the loop variable that loops over the progress_callback_lines also being called line led, in the case that all log lines have a timestamp, to the progress_callback always being called with the first log line received.

I added a test case and verified that it fails with the code as it was on the main branch. I also applied a minimal fix that fixes the problem and the test case.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link

boring-cyborg bot commented Jun 24, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Jun 24, 2025
@@ -524,10 +524,10 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
message_timestamp = line_timestamp
progress_callback_lines.append(line)
else: # previous log line is complete
for line in progress_callback_lines:
for progress_line in progress_callback_lines:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does not seem like an eror. It's just a variable name. Are you sure that if you come back to the original code your test is failing ? It should not.

Copy link
Author

@msimmoteit-neozo msimmoteit-neozo Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a variable name

That is correct. And I intend for this PR to do nothing but to change this variable name (and maybe add the unit test).

The reason this variable name needs to be changed and why I'm creating this MR is that in line 519, the variable line is assigned to raw_line.decode("utf-8", errors="backslashreplace") and in line 540 this variable is used: progress_callback_lines = [line]. But in Python, for-loops are not scoped. This means that the variable line is overwritten with the last value of the loop iterator. Leading to, in my opinion, undesired behaviour for using the progress_callback.

To illustrate this behaviour, I have created this Python fiddle: Link
Or you can execute this in a Python interpreter of your choice:

line = "Outer line"

for line in ["Log 1", "Log 2", "Log 3"]:
    x = line
    
print(f"Line after Loop: {line}")

def test():
    f_line = "Outer Line Function"
    
    for f_line in ["FLog 1", "FLog 2", "FLog 3"]:
        y = f_line
        
    return f_line
    
ret = test()

print(f"Result of Function line: {ret}")

And lastly, if I revert commit e1d67dd and execute the unit test added for this PR, this is the result:
grafik

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not look like the change changes anything beside variable name

@msimmoteit-neozo
Copy link
Author

First of all, I want to thank you for taking your time and looking into my PR. I appreciate that.

I think, I made the reason for this PR clear in my comment. But I want to add a bit of context. We're using the SparkKubernetesOperator and I wanted to use the progress_callback. Usually the startup of the Pod looks like this in the Airflow Logs:

[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ id -u
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + myuid=185
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ id -g
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + mygid=0
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + set +e
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ getent passwd 185
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + uidentry=
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + set -e
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z '' ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -w /etc/passwd ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z /opt/java/openjdk ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + SPARK_CLASSPATH=':/opt/spark/jars/*'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + env
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + grep SPARK_JAVA_OPT_
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + sort -t_ -k4 -n
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + sed 's/[^=]*=\(.*\)/\1/g'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ command -v readarray
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' readarray ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + readarray -t SPARK_EXECUTOR_JAVA_OPTS
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -n '' ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -n '' ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z ']'
[2025-06-24, 09:54:44 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z x ']'

I added a callbacks argument with a progress_callback equivalent to this:

@staticmethod
def progress_callback(*, line: str, client, mode: str, **kwargs):
    logger.log.warning(f"The log line: {line}")

using the logging functionality of Airflow. Normally, I would expect all log lines to be duplicated, yet this is what happened:

[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ id -u
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + myuid=185
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ id -g
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + mygid=0
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + set +e
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ getent passwd 185
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + uidentry=
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + set -e
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z '' ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -w /etc/passwd ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z /opt/java/openjdk ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + SPARK_CLASSPATH=':/opt/spark/jars/*'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + env
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + grep SPARK_JAVA_OPT_
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + sort -t_ -k4 -n
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + sed 's/[^=]*=\(.*\)/\1/g'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] ++ command -v readarray
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' readarray ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + readarray -t SPARK_EXECUTOR_JAVA_OPTS
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -n '' ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -n '' ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z ']'
[2025-06-24, 10:10:16 UTC] {operator.py:36} WARNING - The log line: 2025-06-24T10:10:05.975999206Z ++ id -u
[2025-06-24, 10:10:16 UTC] {pod_manager.py:454} INFO - [spark-kubernetes-driver] + '[' -z x ']'

My callback was repeatedly called with the first line ever logged. I am aware that our deployment is a few versions behind, but this is why I created the unit test.

Best regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants