-
Notifications
You must be signed in to change notification settings - Fork 15.3k
Feature/fix xcom push in spark kubernetes operator #52051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feature/fix xcom push in spark kubernetes operator #52051
Conversation
@@ -293,10 +293,24 @@ def client(self) -> CoreV1Api: | |||
def custom_obj_api(self) -> CustomObjectsApi: | |||
return CustomObjectsApi() | |||
|
|||
def update_pod_spec_add_xcom_sidecar(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def update_pod_spec_add_xcom_sidecar(self): | |
def update_pod_spec_add_xcom_sidecar(self) -> None: |
) | ||
self.template_body["spark"]["spec"]= driver_with_xcom_template | ||
except KeyError as e: | ||
raise AirflowException("Spec missing in SparkApplication template") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not use AirflowException
, we can create a customized exception or just use KeyError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
In all the file, when something doesn't work as expected, an AirflowException is raised
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AirflowException
is broad and not infomative
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets switch to KeyError
@@ -46,7 +50,7 @@ class PodDefaults: | |||
|
|||
|
|||
def add_xcom_sidecar( | |||
pod: k8s.V1Pod, | |||
pod: Union[k8s.V1Pod, dict], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pod: Union[k8s.V1Pod, dict], | |
pod: k8s.V1Pod | dict, |
) | ||
self.template_body["spark"]["spec"]= driver_with_xcom_template | ||
except KeyError as e: | ||
raise AirflowException("Spec missing in SparkApplication template") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AirflowException
is broad and not infomative
Fixes: #39184 where SparkKubernetesOperator tasks hang indefinitely because no sidecar is injected to read /airflow/xcom/return.json.