51
51
from airflow .utils .event_scheduler import EventScheduler
52
52
from airflow .utils .log .logging_mixin import LoggingMixin
53
53
from airflow .utils .session import provide_session
54
- from airflow .utils .state import State
54
+ from airflow .utils .state import State , TaskInstanceState
55
55
56
56
ALL_NAMESPACES = "ALL_NAMESPACES"
57
57
POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
58
58
59
59
# TaskInstance key, command, configuration, pod_template_file
60
60
KubernetesJobType = Tuple [TaskInstanceKey , CommandType , Any , Optional [str ]]
61
61
62
- # key, state, pod_id, namespace, resource_version
62
+ # key, pod state, pod_id, namespace, resource_version
63
63
KubernetesResultsType = Tuple [TaskInstanceKey , Optional [str ], str , str , str ]
64
64
65
- # pod_id, namespace, state, annotations, resource_version
65
+ # pod_id, namespace, pod state, annotations, resource_version
66
66
KubernetesWatchType = Tuple [str , str , Optional [str ], Dict [str , str ], str ]
67
67
68
68
@@ -498,8 +498,8 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
498
498
"""
499
499
Clear tasks that were not yet launched, but were previously queued.
500
500
501
- Tasks can end up in a "Queued" state through when a rescheduled/deferred
502
- operator comes back up for execution (with the same try_number) before the
501
+ Tasks can end up in a "Queued" state when a rescheduled/deferred operator
502
+ comes back up for execution (with the same try_number) before the
503
503
pod of its previous incarnation has been fully removed (we think).
504
504
505
505
It's also possible when an executor abruptly shuts down (leaving a non-empty
@@ -514,7 +514,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
514
514
515
515
self .log .debug ("Clearing tasks that have not been launched" )
516
516
query = session .query (TaskInstance ).filter (
517
- TaskInstance .state == State .QUEUED , TaskInstance .queued_by_job_id == self .job_id
517
+ TaskInstance .state == TaskInstanceState .QUEUED , TaskInstance .queued_by_job_id == self .job_id
518
518
)
519
519
if self .kubernetes_queue :
520
520
query = query .filter (TaskInstance .queue == self .kubernetes_queue )
@@ -566,7 +566,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
566
566
TaskInstance .task_id == ti .task_id ,
567
567
TaskInstance .run_id == ti .run_id ,
568
568
TaskInstance .map_index == ti .map_index ,
569
- ).update ({TaskInstance .state : State .SCHEDULED })
569
+ ).update ({TaskInstance .state : TaskInstanceState .SCHEDULED })
570
570
571
571
def start (self ) -> None :
572
572
"""Starts the executor."""
@@ -621,7 +621,7 @@ def execute_async(
621
621
pod_template_file = executor_config .get ("pod_template_file" , None )
622
622
else :
623
623
pod_template_file = None
624
- self .event_buffer [key ] = (State .QUEUED , self .scheduler_job_id )
624
+ self .event_buffer [key ] = (TaskInstanceState .QUEUED , self .scheduler_job_id )
625
625
self .task_queue .put ((key , command , kube_executor_config , pod_template_file ))
626
626
# We keep a temporary local record that we've handled this so we don't
627
627
# try and remove it from the QUEUED state while we process it
@@ -691,7 +691,7 @@ def sync(self) -> None:
691
691
if e .status in (400 , 422 ):
692
692
self .log .error ("Pod creation failed with reason %r. Failing task" , e .reason )
693
693
key , _ , _ , _ = task
694
- self .change_state (key , State .FAILED , e )
694
+ self .change_state (key , TaskInstanceState .FAILED , e )
695
695
else :
696
696
self .log .warning (
697
697
"ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s" ,
0 commit comments