|
| 1 | +# |
1 | 2 | # Licensed to the Apache Software Foundation (ASF) under one
|
2 | 3 | # or more contributor license agreements. See the NOTICE file
|
3 | 4 | # distributed with this work for additional information
|
|
16 | 17 | # under the License.
|
17 | 18 | from __future__ import annotations
|
18 | 19 |
|
19 |
| -import asyncio |
20 |
| -from asyncio import CancelledError |
21 |
| -from datetime import datetime |
22 |
| -from enum import Enum |
23 |
| -from typing import Any, AsyncIterator |
24 |
| - |
25 |
| -import pytz |
26 |
| -from kubernetes_asyncio.client.models import V1Pod |
27 |
| - |
28 |
| -from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook |
29 |
| -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase |
30 |
| -from airflow.triggers.base import BaseTrigger, TriggerEvent |
31 |
| - |
32 |
| - |
33 |
| -class ContainerState(str, Enum): |
34 |
| - """ |
35 |
| - Possible container states |
36 |
| - See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase. |
37 |
| - """ |
38 |
| - |
39 |
| - WAITING = "waiting" |
40 |
| - RUNNING = "running" |
41 |
| - TERMINATED = "terminated" |
42 |
| - FAILED = "failed" |
43 |
| - UNDEFINED = "undefined" |
44 |
| - |
45 |
| - |
46 |
| -class KubernetesPodTrigger(BaseTrigger): |
47 |
| - """ |
48 |
| - KubernetesPodTrigger run on the trigger worker to check the state of Pod. |
49 |
| -
|
50 |
| - :param pod_name: The name of the pod. |
51 |
| - :param pod_namespace: The namespace of the pod. |
52 |
| - :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>` |
53 |
| - for the Kubernetes cluster. |
54 |
| - :param cluster_context: Context that points to kubernetes cluster. |
55 |
| - :param config_dict: Kubernetes config file content in dict format. If not specified, |
56 |
| - default value is ``~/.kube/config`` |
57 |
| - :param poll_interval: Polling period in seconds to check for the status. |
58 |
| - :param trigger_start_time: time in Datetime format when the trigger was started |
59 |
| - :param in_cluster: run kubernetes client with in_cluster configuration. |
60 |
| - :param should_delete_pod: What to do when the pod reaches its final |
61 |
| - state, or the execution is interrupted. If True (default), delete the |
62 |
| - pod; if False, leave the pod. |
63 |
| - :param get_logs: get the stdout of the container as logs of the tasks. |
64 |
| - :param startup_timeout: timeout in seconds to start up the pod. |
65 |
| - """ |
66 |
| - |
67 |
| - def __init__( |
68 |
| - self, |
69 |
| - pod_name: str, |
70 |
| - pod_namespace: str, |
71 |
| - trigger_start_time: datetime, |
72 |
| - base_container_name: str, |
73 |
| - kubernetes_conn_id: str | None = None, |
74 |
| - poll_interval: float = 2, |
75 |
| - cluster_context: str | None = None, |
76 |
| - config_dict: dict | None = None, |
77 |
| - in_cluster: bool | None = None, |
78 |
| - should_delete_pod: bool = True, |
79 |
| - get_logs: bool = True, |
80 |
| - startup_timeout: int = 120, |
81 |
| - ): |
82 |
| - super().__init__() |
83 |
| - self.pod_name = pod_name |
84 |
| - self.pod_namespace = pod_namespace |
85 |
| - self.trigger_start_time = trigger_start_time |
86 |
| - self.base_container_name = base_container_name |
87 |
| - self.kubernetes_conn_id = kubernetes_conn_id |
88 |
| - self.poll_interval = poll_interval |
89 |
| - self.cluster_context = cluster_context |
90 |
| - self.config_dict = config_dict |
91 |
| - self.in_cluster = in_cluster |
92 |
| - self.should_delete_pod = should_delete_pod |
93 |
| - self.get_logs = get_logs |
94 |
| - self.startup_timeout = startup_timeout |
95 |
| - |
96 |
| - self._hook: AsyncKubernetesHook | None = None |
97 |
| - self._since_time = None |
98 |
| - |
99 |
| - def serialize(self) -> tuple[str, dict[str, Any]]: |
100 |
| - """Serializes KubernetesCreatePodTrigger arguments and classpath.""" |
101 |
| - return ( |
102 |
| - "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod.KubernetesPodTrigger", |
103 |
| - { |
104 |
| - "pod_name": self.pod_name, |
105 |
| - "pod_namespace": self.pod_namespace, |
106 |
| - "base_container_name": self.base_container_name, |
107 |
| - "kubernetes_conn_id": self.kubernetes_conn_id, |
108 |
| - "poll_interval": self.poll_interval, |
109 |
| - "cluster_context": self.cluster_context, |
110 |
| - "config_dict": self.config_dict, |
111 |
| - "in_cluster": self.in_cluster, |
112 |
| - "should_delete_pod": self.should_delete_pod, |
113 |
| - "get_logs": self.get_logs, |
114 |
| - "startup_timeout": self.startup_timeout, |
115 |
| - "trigger_start_time": self.trigger_start_time, |
116 |
| - }, |
117 |
| - ) |
118 |
| - |
119 |
| - async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override] |
120 |
| - """Gets current pod status and yields a TriggerEvent""" |
121 |
| - hook = self._get_async_hook() |
122 |
| - self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace) |
123 |
| - while True: |
124 |
| - try: |
125 |
| - pod = await hook.get_pod( |
126 |
| - name=self.pod_name, |
127 |
| - namespace=self.pod_namespace, |
128 |
| - ) |
129 |
| - |
130 |
| - pod_status = pod.status.phase |
131 |
| - self.log.debug("Pod %s status: %s", self.pod_name, pod_status) |
132 |
| - |
133 |
| - container_state = self.define_container_state(pod) |
134 |
| - self.log.debug("Container %s status: %s", self.base_container_name, container_state) |
135 |
| - |
136 |
| - if container_state == ContainerState.TERMINATED: |
137 |
| - yield TriggerEvent( |
138 |
| - { |
139 |
| - "name": self.pod_name, |
140 |
| - "namespace": self.pod_namespace, |
141 |
| - "status": "success", |
142 |
| - "message": "All containers inside pod have started successfully.", |
143 |
| - } |
144 |
| - ) |
145 |
| - return |
146 |
| - elif self.should_wait(pod_phase=pod_status, container_state=container_state): |
147 |
| - self.log.info("Container is not completed and still working.") |
148 |
| - |
149 |
| - if pod_status == PodPhase.PENDING and container_state == ContainerState.UNDEFINED: |
150 |
| - delta = datetime.now(tz=pytz.UTC) - self.trigger_start_time |
151 |
| - if delta.total_seconds() >= self.startup_timeout: |
152 |
| - message = ( |
153 |
| - f"Pod took longer than {self.startup_timeout} seconds to start. " |
154 |
| - "Check the pod events in kubernetes to determine why." |
155 |
| - ) |
156 |
| - yield TriggerEvent( |
157 |
| - { |
158 |
| - "name": self.pod_name, |
159 |
| - "namespace": self.pod_namespace, |
160 |
| - "status": "timeout", |
161 |
| - "message": message, |
162 |
| - } |
163 |
| - ) |
164 |
| - return |
165 |
| - |
166 |
| - self.log.info("Sleeping for %s seconds.", self.poll_interval) |
167 |
| - await asyncio.sleep(self.poll_interval) |
168 |
| - else: |
169 |
| - yield TriggerEvent( |
170 |
| - { |
171 |
| - "name": self.pod_name, |
172 |
| - "namespace": self.pod_namespace, |
173 |
| - "status": "failed", |
174 |
| - "message": pod.status.message, |
175 |
| - } |
176 |
| - ) |
177 |
| - return |
178 |
| - except CancelledError: |
179 |
| - # That means that task was marked as failed |
180 |
| - if self.get_logs: |
181 |
| - self.log.info("Outputting container logs...") |
182 |
| - await self._get_async_hook().read_logs( |
183 |
| - name=self.pod_name, |
184 |
| - namespace=self.pod_namespace, |
185 |
| - ) |
186 |
| - if self.should_delete_pod: |
187 |
| - self.log.info("Deleting pod...") |
188 |
| - await self._get_async_hook().delete_pod( |
189 |
| - name=self.pod_name, |
190 |
| - namespace=self.pod_namespace, |
191 |
| - ) |
192 |
| - yield TriggerEvent( |
193 |
| - { |
194 |
| - "name": self.pod_name, |
195 |
| - "namespace": self.pod_namespace, |
196 |
| - "status": "cancelled", |
197 |
| - "message": "Pod execution was cancelled", |
198 |
| - } |
199 |
| - ) |
200 |
| - return |
201 |
| - except Exception as e: |
202 |
| - self.log.exception("Exception occurred while checking pod phase:") |
203 |
| - yield TriggerEvent( |
204 |
| - { |
205 |
| - "name": self.pod_name, |
206 |
| - "namespace": self.pod_namespace, |
207 |
| - "status": "error", |
208 |
| - "message": str(e), |
209 |
| - } |
210 |
| - ) |
211 |
| - return |
212 |
| - |
213 |
| - def _get_async_hook(self) -> AsyncKubernetesHook: |
214 |
| - if self._hook is None: |
215 |
| - self._hook = AsyncKubernetesHook( |
216 |
| - conn_id=self.kubernetes_conn_id, |
217 |
| - in_cluster=self.in_cluster, |
218 |
| - config_dict=self.config_dict, |
219 |
| - cluster_context=self.cluster_context, |
220 |
| - ) |
221 |
| - return self._hook |
222 |
| - |
223 |
| - def define_container_state(self, pod: V1Pod) -> ContainerState: |
224 |
| - pod_containers = pod.status.container_statuses |
225 |
| - |
226 |
| - if pod_containers is None: |
227 |
| - return ContainerState.UNDEFINED |
228 |
| - |
229 |
| - container = [c for c in pod_containers if c.name == self.base_container_name][0] |
| 20 | +import warnings |
230 | 21 |
|
231 |
| - for state in (ContainerState.RUNNING, ContainerState.WAITING, ContainerState.TERMINATED): |
232 |
| - state_obj = getattr(container.state, state) |
233 |
| - if state_obj is not None: |
234 |
| - if state != ContainerState.TERMINATED: |
235 |
| - return state |
236 |
| - else: |
237 |
| - return ContainerState.TERMINATED if state_obj.exit_code == 0 else ContainerState.FAILED |
238 |
| - return ContainerState.UNDEFINED |
| 22 | +from airflow.providers.cncf.kubernetes.triggers.pod import * # noqa |
239 | 23 |
|
240 |
| - @staticmethod |
241 |
| - def should_wait(pod_phase: PodPhase, container_state: ContainerState) -> bool: |
242 |
| - return ( |
243 |
| - container_state == ContainerState.WAITING |
244 |
| - or container_state == ContainerState.RUNNING |
245 |
| - or (container_state == ContainerState.UNDEFINED and pod_phase == PodPhase.PENDING) |
246 |
| - ) |
| 24 | +warnings.warn( |
| 25 | + "This module is deprecated. Please use `airflow.providers.cncf.kubernetes.triggers.pod` instead.", |
| 26 | + DeprecationWarning, |
| 27 | + stacklevel=2, |
| 28 | +) |
0 commit comments