Skip to content

Commit 047d040

Browse files
authored
Merge pull request #119012 from pohly/dra-batch-node-prepare
kubelet: support batched prepare/unprepare in v1alpha3 DRA plugin API
2 parents 2ec4e14 + d743c50 commit 047d040

File tree

11 files changed

+2604
-171
lines changed

11 files changed

+2604
-171
lines changed

pkg/kubelet/cm/dra/manager.go

Lines changed: 112 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
clientset "k8s.io/client-go/kubernetes"
2929
"k8s.io/dynamic-resource-allocation/resourceclaim"
3030
"k8s.io/klog/v2"
31+
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
3132
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
3233
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
3334
)
@@ -62,10 +63,12 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
6263
}
6364

6465
// PrepareResources attempts to prepare all of the required resource
65-
// plugin resources for the input container, issue an NodePrepareResource rpc request
66+
// plugin resources for the input container, issue NodePrepareResources rpc requests
6667
// for each new resource requirement, process their responses and update the cached
6768
// containerResources on success.
6869
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
70+
batches := make(map[string][]*drapb.Claim)
71+
claimInfos := make(map[types.UID]*ClaimInfo)
6972
for i := range pod.Spec.ResourceClaims {
7073
podClaim := &pod.Spec.ResourceClaims[i]
7174
klog.V(3).InfoS("Processing resource", "podClaim", podClaim.Name, "pod", pod.Name)
@@ -139,56 +142,79 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
139142
sets.New(string(pod.UID)),
140143
)
141144

142-
// Walk through each resourceHandle
145+
// Loop through all plugins and prepare for calling NodePrepareResources.
143146
for _, resourceHandle := range resourceHandles {
144147
// If no DriverName is provided in the resourceHandle, we
145148
// use the DriverName from the status
146149
pluginName := resourceHandle.DriverName
147150
if pluginName == "" {
148151
pluginName = resourceClaim.Status.DriverName
149152
}
153+
claim := &drapb.Claim{
154+
Namespace: resourceClaim.Namespace,
155+
Uid: string(resourceClaim.UID),
156+
Name: resourceClaim.Name,
157+
ResourceHandle: resourceHandle.Data,
158+
}
159+
batches[pluginName] = append(batches[pluginName], claim)
160+
}
161+
claimInfos[resourceClaim.UID] = claimInfo
162+
}
150163

151-
// Call NodePrepareResource RPC for each resourceHandle
152-
client, err := dra.NewDRAPluginClient(pluginName)
153-
if err != nil {
154-
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
164+
// Call NodePrepareResources for all claims in each batch.
165+
// If there is any error, processing gets aborted.
166+
// We could try to continue, but that would make the code more complex.
167+
for pluginName, claims := range batches {
168+
// Call NodePrepareResources RPC for all resource handles.
169+
client, err := dra.NewDRAPluginClient(pluginName)
170+
if err != nil {
171+
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
172+
}
173+
response, err := client.NodePrepareResources(context.Background(), &drapb.NodePrepareResourcesRequest{Claims: claims})
174+
if err != nil {
175+
// General error unrelated to any particular claim.
176+
return fmt.Errorf("NodePrepareResources failed: %v", err)
177+
}
178+
for claimUID, result := range response.Claims {
179+
reqClaim := lookupClaimRequest(claims, claimUID)
180+
if reqClaim == nil {
181+
return fmt.Errorf("NodePrepareResources returned result for unknown claim UID %s", claimUID)
155182
}
156-
response, err := client.NodePrepareResource(
157-
context.Background(),
158-
resourceClaim.Namespace,
159-
resourceClaim.UID,
160-
resourceClaim.Name,
161-
resourceHandle.Data)
162-
if err != nil {
163-
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
164-
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
183+
if result.Error != "" {
184+
return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
165185
}
166-
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)
167186

168-
// Add the CDI Devices returned by NodePrepareResource to
187+
claimInfo := claimInfos[types.UID(claimUID)]
188+
189+
// Add the CDI Devices returned by NodePrepareResources to
169190
// the claimInfo object.
170-
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
191+
err = claimInfo.addCDIDevices(pluginName, result.CDIDevices)
171192
if err != nil {
172193
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
173194
}
174195

175196
// TODO: We (re)add the claimInfo object to the cache and
176197
// sync it to the checkpoint *after* the
177-
// NodePrepareResource call has completed. This will cause
198+
// NodePrepareResources call has completed. This will cause
178199
// issues if the kubelet gets restarted between
179-
// NodePrepareResource and syncToCheckpoint. It will result
180-
// in not calling NodeUnprepareResource for this claim
200+
// NodePrepareResources and syncToCheckpoint. It will result
201+
// in not calling NodeUnprepareResources for this claim
181202
// because no claimInfo will be synced back to the cache
182203
// for it after the restart. We need to resolve this issue
183204
// before moving to beta.
184205
m.cache.add(claimInfo)
206+
}
185207

186-
// Checkpoint to reduce redundant calls to
187-
// NodePrepareResource() after a kubelet restart.
188-
err = m.cache.syncToCheckpoint()
189-
if err != nil {
190-
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
191-
}
208+
// Checkpoint to reduce redundant calls to
209+
// NodePrepareResources after a kubelet restart.
210+
err = m.cache.syncToCheckpoint()
211+
if err != nil {
212+
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
213+
}
214+
215+
unfinished := len(claims) - len(response.Claims)
216+
if unfinished != 0 {
217+
return fmt.Errorf("NodePrepareResources left out %d claims", unfinished)
192218
}
193219
}
194220
// Checkpoint to capture all of the previous addPodReference() calls.
@@ -199,6 +225,15 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
199225
return nil
200226
}
201227

228+
func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
229+
for _, claim := range claims {
230+
if claim.Uid == claimUID {
231+
return claim
232+
}
233+
}
234+
return nil
235+
}
236+
202237
func claimIsUsedByPod(podClaim *v1.PodResourceClaim, pod *v1.Pod) bool {
203238
if claimIsUsedByContainers(podClaim, pod.Spec.InitContainers) {
204239
return true
@@ -274,7 +309,8 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
274309
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
275310
// already been successfully unprepared.
276311
func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
277-
// Call NodeUnprepareResource RPC for every resource claim referenced by the pod
312+
batches := make(map[string][]*drapb.Claim)
313+
claimInfos := make(map[types.UID]*ClaimInfo)
278314
for i := range pod.Spec.ResourceClaims {
279315
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
280316
if err != nil {
@@ -324,8 +360,7 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
324360
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
325361
}
326362

327-
// Loop through all plugins and call NodeUnprepareResource only for the
328-
// last pod that references the claim
363+
// Loop through all plugins and prepare for calling NodeUnprepareResources.
329364
for _, resourceHandle := range resourceHandles {
330365
// If no DriverName is provided in the resourceHandle, we
331366
// use the DriverName from the status
@@ -334,38 +369,62 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
334369
pluginName = claimInfo.DriverName
335370
}
336371

337-
// Call NodeUnprepareResource RPC for each resourceHandle
338-
client, err := dra.NewDRAPluginClient(pluginName)
339-
if err != nil {
340-
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
341-
}
342-
response, err := client.NodeUnprepareResource(
343-
context.Background(),
344-
claimInfo.Namespace,
345-
claimInfo.ClaimUID,
346-
claimInfo.ClaimName,
347-
resourceHandle.Data)
348-
if err != nil {
349-
return fmt.Errorf(
350-
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
351-
pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, resourceHandle.Data, err)
372+
claim := &drapb.Claim{
373+
Namespace: resourceClaim.Namespace,
374+
Uid: string(resourceClaim.UID),
375+
Name: resourceClaim.Name,
376+
ResourceHandle: resourceHandle.Data,
352377
}
353-
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
378+
batches[pluginName] = append(batches[pluginName], claim)
379+
}
380+
claimInfos[resourceClaim.UID] = claimInfo
381+
}
382+
383+
// Call NodeUnprepareResources for all claims in each batch.
384+
// If there is any error, processing gets aborted.
385+
// We could try to continue, but that would make the code more complex.
386+
for pluginName, claims := range batches {
387+
// Call NodeUnprepareResources RPC for all resource handles.
388+
client, err := dra.NewDRAPluginClient(pluginName)
389+
if err != nil {
390+
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
354391
}
392+
response, err := client.NodeUnprepareResources(context.Background(), &drapb.NodeUnprepareResourcesRequest{Claims: claims})
393+
if err != nil {
394+
// General error unrelated to any particular claim.
395+
return fmt.Errorf("NodeUnprepareResources failed: %v", err)
396+
}
397+
398+
for claimUID, result := range response.Claims {
399+
reqClaim := lookupClaimRequest(claims, claimUID)
400+
if reqClaim == nil {
401+
return fmt.Errorf("NodeUnprepareResources returned result for unknown claim UID %s", claimUID)
402+
}
403+
if result.Error != "" {
404+
return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, err)
405+
}
355406

356-
// Delete last pod UID only if all NodeUnprepareResource calls succeed.
357-
// This ensures that the status manager doesn't enter termination status
358-
// for the pod. This logic is implemented in
359-
// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
360-
claimInfo.deletePodReference(pod.UID)
361-
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
407+
// Delete last pod UID only if unprepare succeeds.
408+
// This ensures that the status manager doesn't enter termination status
409+
// for the pod. This logic is implemented in
410+
// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
411+
claimInfo := claimInfos[types.UID(claimUID)]
412+
claimInfo.deletePodReference(pod.UID)
413+
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
414+
}
362415

363-
// Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
416+
// Checkpoint to reduce redundant calls to NodeUnprepareResources after a kubelet restart.
364417
err = m.cache.syncToCheckpoint()
365418
if err != nil {
366419
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
367420
}
421+
422+
unfinished := len(claims) - len(response.Claims)
423+
if unfinished != 0 {
424+
return fmt.Errorf("NodeUnprepareResources left out %d claims", unfinished)
425+
}
368426
}
427+
369428
// Checkpoint to capture all of the previous deletePodReference() calls.
370429
err := m.cache.syncToCheckpoint()
371430
if err != nil {

0 commit comments

Comments
 (0)