Skip to content

Feature/fix xcom push in spark kubernetes operator #52051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

kesem0811
Copy link
Contributor

@kesem0811 kesem0811 commented Jun 23, 2025

Fixes: #39184 where SparkKubernetesOperator tasks hang indefinitely because no sidecar is injected to read /airflow/xcom/return.json.

@eladkal eladkal requested review from romsharon98 and Lee-W June 24, 2025 04:36
@@ -293,10 +293,24 @@ def client(self) -> CoreV1Api:
def custom_obj_api(self) -> CustomObjectsApi:
return CustomObjectsApi()

def update_pod_spec_add_xcom_sidecar(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def update_pod_spec_add_xcom_sidecar(self):
def update_pod_spec_add_xcom_sidecar(self) -> None:

)
self.template_body["spark"]["spec"]= driver_with_xcom_template
except KeyError as e:
raise AirflowException("Spec missing in SparkApplication template") from e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not use AirflowException, we can create a customized exception or just use KeyError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?
In all the file, when something doesn't work as expected, an AirflowException is raised

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AirflowException is broad and not infomative

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets switch to KeyError

@@ -46,7 +50,7 @@ class PodDefaults:


def add_xcom_sidecar(
pod: k8s.V1Pod,
pod: Union[k8s.V1Pod, dict],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pod: Union[k8s.V1Pod, dict],
pod: k8s.V1Pod | dict,

)
self.template_body["spark"]["spec"]= driver_with_xcom_template
except KeyError as e:
raise AirflowException("Spec missing in SparkApplication template") from e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AirflowException is broad and not infomative

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Problem with pushing Xcom when using SparkKubernetesOperator
3 participants