Skip to content

Commit 9710394

Browse files
authored
Align cncf provider file names with AIP-21 (#29905)
Changes to import paths: `operators.kubernetes_pod` -> `operators.pod` `triggers.kubernetes_pod` -> `triggers.pod` This PR is backward compatible
1 parent ea0099e commit 9710394

File tree

20 files changed

+1186
-1107
lines changed

20 files changed

+1186
-1107
lines changed

airflow/contrib/operators/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@
946946
},
947947
'kubernetes_pod_operator': {
948948
'KubernetesPodOperator': (
949-
'airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator'
949+
'airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator'
950950
),
951951
},
952952
'mlengine_operator': {

airflow/providers/amazon/aws/operators/eks.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@
2626
from airflow import AirflowException
2727
from airflow.models import BaseOperator
2828
from airflow.providers.amazon.aws.hooks.eks import EksHook
29-
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
29+
30+
try:
31+
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
32+
except ImportError:
33+
# preserve backward compatibility for older versions of cncf.kubernetes provider
34+
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
3035

3136
if TYPE_CHECKING:
3237
from airflow.utils.context import Context

airflow/providers/cncf/kubernetes/decorators/kubernetes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from kubernetes.client import models as k8s
3131

3232
from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory
33-
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
33+
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
3434
from airflow.providers.cncf.kubernetes.python_kubernetes_script import (
3535
remove_task_decorator,
3636
write_python_script,

airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py

Lines changed: 6 additions & 853 deletions
Large diffs are not rendered by default.

airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 875 additions & 0 deletions
Large diffs are not rendered by default.

airflow/providers/cncf/kubernetes/provider.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ operators:
8989
- integration-name: Kubernetes
9090
python-modules:
9191
- airflow.providers.cncf.kubernetes.operators.kubernetes_pod
92+
- airflow.providers.cncf.kubernetes.operators.pod
9293
- airflow.providers.cncf.kubernetes.operators.spark_kubernetes
9394

9495
sensors:
Lines changed: 8 additions & 226 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#
12
# Licensed to the Apache Software Foundation (ASF) under one
23
# or more contributor license agreements. See the NOTICE file
34
# distributed with this work for additional information
@@ -16,231 +17,12 @@
1617
# under the License.
1718
from __future__ import annotations
1819

19-
import asyncio
20-
from asyncio import CancelledError
21-
from datetime import datetime
22-
from enum import Enum
23-
from typing import Any, AsyncIterator
24-
25-
import pytz
26-
from kubernetes_asyncio.client.models import V1Pod
27-
28-
from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook
29-
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
30-
from airflow.triggers.base import BaseTrigger, TriggerEvent
31-
32-
33-
class ContainerState(str, Enum):
34-
"""
35-
Possible container states
36-
See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
37-
"""
38-
39-
WAITING = "waiting"
40-
RUNNING = "running"
41-
TERMINATED = "terminated"
42-
FAILED = "failed"
43-
UNDEFINED = "undefined"
44-
45-
46-
class KubernetesPodTrigger(BaseTrigger):
47-
"""
48-
KubernetesPodTrigger run on the trigger worker to check the state of Pod.
49-
50-
:param pod_name: The name of the pod.
51-
:param pod_namespace: The namespace of the pod.
52-
:param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
53-
for the Kubernetes cluster.
54-
:param cluster_context: Context that points to kubernetes cluster.
55-
:param config_dict: Kubernetes config file content in dict format. If not specified,
56-
default value is ``~/.kube/config``
57-
:param poll_interval: Polling period in seconds to check for the status.
58-
:param trigger_start_time: time in Datetime format when the trigger was started
59-
:param in_cluster: run kubernetes client with in_cluster configuration.
60-
:param should_delete_pod: What to do when the pod reaches its final
61-
state, or the execution is interrupted. If True (default), delete the
62-
pod; if False, leave the pod.
63-
:param get_logs: get the stdout of the container as logs of the tasks.
64-
:param startup_timeout: timeout in seconds to start up the pod.
65-
"""
66-
67-
def __init__(
68-
self,
69-
pod_name: str,
70-
pod_namespace: str,
71-
trigger_start_time: datetime,
72-
base_container_name: str,
73-
kubernetes_conn_id: str | None = None,
74-
poll_interval: float = 2,
75-
cluster_context: str | None = None,
76-
config_dict: dict | None = None,
77-
in_cluster: bool | None = None,
78-
should_delete_pod: bool = True,
79-
get_logs: bool = True,
80-
startup_timeout: int = 120,
81-
):
82-
super().__init__()
83-
self.pod_name = pod_name
84-
self.pod_namespace = pod_namespace
85-
self.trigger_start_time = trigger_start_time
86-
self.base_container_name = base_container_name
87-
self.kubernetes_conn_id = kubernetes_conn_id
88-
self.poll_interval = poll_interval
89-
self.cluster_context = cluster_context
90-
self.config_dict = config_dict
91-
self.in_cluster = in_cluster
92-
self.should_delete_pod = should_delete_pod
93-
self.get_logs = get_logs
94-
self.startup_timeout = startup_timeout
95-
96-
self._hook: AsyncKubernetesHook | None = None
97-
self._since_time = None
98-
99-
def serialize(self) -> tuple[str, dict[str, Any]]:
100-
"""Serializes KubernetesCreatePodTrigger arguments and classpath."""
101-
return (
102-
"airflow.providers.cncf.kubernetes.triggers.kubernetes_pod.KubernetesPodTrigger",
103-
{
104-
"pod_name": self.pod_name,
105-
"pod_namespace": self.pod_namespace,
106-
"base_container_name": self.base_container_name,
107-
"kubernetes_conn_id": self.kubernetes_conn_id,
108-
"poll_interval": self.poll_interval,
109-
"cluster_context": self.cluster_context,
110-
"config_dict": self.config_dict,
111-
"in_cluster": self.in_cluster,
112-
"should_delete_pod": self.should_delete_pod,
113-
"get_logs": self.get_logs,
114-
"startup_timeout": self.startup_timeout,
115-
"trigger_start_time": self.trigger_start_time,
116-
},
117-
)
118-
119-
async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
120-
"""Gets current pod status and yields a TriggerEvent"""
121-
hook = self._get_async_hook()
122-
self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace)
123-
while True:
124-
try:
125-
pod = await hook.get_pod(
126-
name=self.pod_name,
127-
namespace=self.pod_namespace,
128-
)
129-
130-
pod_status = pod.status.phase
131-
self.log.debug("Pod %s status: %s", self.pod_name, pod_status)
132-
133-
container_state = self.define_container_state(pod)
134-
self.log.debug("Container %s status: %s", self.base_container_name, container_state)
135-
136-
if container_state == ContainerState.TERMINATED:
137-
yield TriggerEvent(
138-
{
139-
"name": self.pod_name,
140-
"namespace": self.pod_namespace,
141-
"status": "success",
142-
"message": "All containers inside pod have started successfully.",
143-
}
144-
)
145-
return
146-
elif self.should_wait(pod_phase=pod_status, container_state=container_state):
147-
self.log.info("Container is not completed and still working.")
148-
149-
if pod_status == PodPhase.PENDING and container_state == ContainerState.UNDEFINED:
150-
delta = datetime.now(tz=pytz.UTC) - self.trigger_start_time
151-
if delta.total_seconds() >= self.startup_timeout:
152-
message = (
153-
f"Pod took longer than {self.startup_timeout} seconds to start. "
154-
"Check the pod events in kubernetes to determine why."
155-
)
156-
yield TriggerEvent(
157-
{
158-
"name": self.pod_name,
159-
"namespace": self.pod_namespace,
160-
"status": "timeout",
161-
"message": message,
162-
}
163-
)
164-
return
165-
166-
self.log.info("Sleeping for %s seconds.", self.poll_interval)
167-
await asyncio.sleep(self.poll_interval)
168-
else:
169-
yield TriggerEvent(
170-
{
171-
"name": self.pod_name,
172-
"namespace": self.pod_namespace,
173-
"status": "failed",
174-
"message": pod.status.message,
175-
}
176-
)
177-
return
178-
except CancelledError:
179-
# That means that task was marked as failed
180-
if self.get_logs:
181-
self.log.info("Outputting container logs...")
182-
await self._get_async_hook().read_logs(
183-
name=self.pod_name,
184-
namespace=self.pod_namespace,
185-
)
186-
if self.should_delete_pod:
187-
self.log.info("Deleting pod...")
188-
await self._get_async_hook().delete_pod(
189-
name=self.pod_name,
190-
namespace=self.pod_namespace,
191-
)
192-
yield TriggerEvent(
193-
{
194-
"name": self.pod_name,
195-
"namespace": self.pod_namespace,
196-
"status": "cancelled",
197-
"message": "Pod execution was cancelled",
198-
}
199-
)
200-
return
201-
except Exception as e:
202-
self.log.exception("Exception occurred while checking pod phase:")
203-
yield TriggerEvent(
204-
{
205-
"name": self.pod_name,
206-
"namespace": self.pod_namespace,
207-
"status": "error",
208-
"message": str(e),
209-
}
210-
)
211-
return
212-
213-
def _get_async_hook(self) -> AsyncKubernetesHook:
214-
if self._hook is None:
215-
self._hook = AsyncKubernetesHook(
216-
conn_id=self.kubernetes_conn_id,
217-
in_cluster=self.in_cluster,
218-
config_dict=self.config_dict,
219-
cluster_context=self.cluster_context,
220-
)
221-
return self._hook
222-
223-
def define_container_state(self, pod: V1Pod) -> ContainerState:
224-
pod_containers = pod.status.container_statuses
225-
226-
if pod_containers is None:
227-
return ContainerState.UNDEFINED
228-
229-
container = [c for c in pod_containers if c.name == self.base_container_name][0]
20+
import warnings
23021

231-
for state in (ContainerState.RUNNING, ContainerState.WAITING, ContainerState.TERMINATED):
232-
state_obj = getattr(container.state, state)
233-
if state_obj is not None:
234-
if state != ContainerState.TERMINATED:
235-
return state
236-
else:
237-
return ContainerState.TERMINATED if state_obj.exit_code == 0 else ContainerState.FAILED
238-
return ContainerState.UNDEFINED
22+
from airflow.providers.cncf.kubernetes.triggers.pod import * # noqa
23923

240-
@staticmethod
241-
def should_wait(pod_phase: PodPhase, container_state: ContainerState) -> bool:
242-
return (
243-
container_state == ContainerState.WAITING
244-
or container_state == ContainerState.RUNNING
245-
or (container_state == ContainerState.UNDEFINED and pod_phase == PodPhase.PENDING)
246-
)
24+
warnings.warn(
25+
"This module is deprecated. Please use `airflow.providers.cncf.kubernetes.triggers.pod` instead.",
26+
DeprecationWarning,
27+
stacklevel=2,
28+
)

0 commit comments

Comments
 (0)