Skip to content

Commit bfbe2cb

Browse files
General cleanup of things around KubernetesExecutor (#29148)
The biggest change here is using `State` or `TaskInstanceState` where appropriate. Even though their strings are the same, it provides useful context, so we should use the correct enum. One could argue `State` isn't the right thing to use, but at least it's consistently wrong now :)
1 parent a35ec95 commit bfbe2cb

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

airflow/executors/kubernetes_executor.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,18 @@
5151
from airflow.utils.event_scheduler import EventScheduler
5252
from airflow.utils.log.logging_mixin import LoggingMixin
5353
from airflow.utils.session import provide_session
54-
from airflow.utils.state import State
54+
from airflow.utils.state import State, TaskInstanceState
5555

5656
ALL_NAMESPACES = "ALL_NAMESPACES"
5757
POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
5858

5959
# TaskInstance key, command, configuration, pod_template_file
6060
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
6161

62-
# key, state, pod_id, namespace, resource_version
62+
# key, pod state, pod_id, namespace, resource_version
6363
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str, str]
6464

65-
# pod_id, namespace, state, annotations, resource_version
65+
# pod_id, namespace, pod state, annotations, resource_version
6666
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
6767

6868

@@ -498,8 +498,8 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
498498
"""
499499
Clear tasks that were not yet launched, but were previously queued.
500500
501-
Tasks can end up in a "Queued" state through when a rescheduled/deferred
502-
operator comes back up for execution (with the same try_number) before the
501+
Tasks can end up in a "Queued" state when a rescheduled/deferred operator
502+
comes back up for execution (with the same try_number) before the
503503
pod of its previous incarnation has been fully removed (we think).
504504
505505
It's also possible when an executor abruptly shuts down (leaving a non-empty
@@ -514,7 +514,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
514514

515515
self.log.debug("Clearing tasks that have not been launched")
516516
query = session.query(TaskInstance).filter(
517-
TaskInstance.state == State.QUEUED, TaskInstance.queued_by_job_id == self.job_id
517+
TaskInstance.state == TaskInstanceState.QUEUED, TaskInstance.queued_by_job_id == self.job_id
518518
)
519519
if self.kubernetes_queue:
520520
query = query.filter(TaskInstance.queue == self.kubernetes_queue)
@@ -566,7 +566,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
566566
TaskInstance.task_id == ti.task_id,
567567
TaskInstance.run_id == ti.run_id,
568568
TaskInstance.map_index == ti.map_index,
569-
).update({TaskInstance.state: State.SCHEDULED})
569+
).update({TaskInstance.state: TaskInstanceState.SCHEDULED})
570570

571571
def start(self) -> None:
572572
"""Starts the executor."""
@@ -621,7 +621,7 @@ def execute_async(
621621
pod_template_file = executor_config.get("pod_template_file", None)
622622
else:
623623
pod_template_file = None
624-
self.event_buffer[key] = (State.QUEUED, self.scheduler_job_id)
624+
self.event_buffer[key] = (TaskInstanceState.QUEUED, self.scheduler_job_id)
625625
self.task_queue.put((key, command, kube_executor_config, pod_template_file))
626626
# We keep a temporary local record that we've handled this so we don't
627627
# try and remove it from the QUEUED state while we process it
@@ -691,7 +691,7 @@ def sync(self) -> None:
691691
if e.status in (400, 422):
692692
self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
693693
key, _, _, _ = task
694-
self.change_state(key, State.FAILED, e)
694+
self.change_state(key, TaskInstanceState.FAILED, e)
695695
else:
696696
self.log.warning(
697697
"ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s",

airflow/jobs/scheduler_job.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ def _process_executor_events(self, session: Session) -> int:
614614
state,
615615
ti_key.try_number,
616616
)
617-
if state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS, TaskInstanceState.QUEUED):
617+
if state in (State.FAILED, State.SUCCESS, State.QUEUED):
618618
tis_with_right_state.append(ti_key)
619619

620620
# Return if no finished tasks
@@ -637,7 +637,7 @@ def _process_executor_events(self, session: Session) -> int:
637637
buffer_key = ti.key.with_try_number(try_number)
638638
state, info = event_buffer.pop(buffer_key)
639639

640-
if state == TaskInstanceState.QUEUED:
640+
if state == State.QUEUED:
641641
ti.external_executor_id = info
642642
self.log.info("Setting external_id for %s to %s", ti, info)
643643
continue

airflow/kubernetes/pod_generator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class PodGenerator:
110110
Any configuration that is container specific gets applied to
111111
the first container in the list of containers.
112112
113-
:param pod: The fully specified pod. Mutually exclusive with `path_or_string`
113+
:param pod: The fully specified pod. Mutually exclusive with `pod_template_file`
114114
:param pod_template_file: Path to YAML file. Mutually exclusive with `pod`
115115
:param extract_xcom: Whether to bring up a container for xcom
116116
"""

0 commit comments

Comments
 (0)