@@ -28,6 +28,7 @@ import (
28
28
clientset "k8s.io/client-go/kubernetes"
29
29
"k8s.io/dynamic-resource-allocation/resourceclaim"
30
30
"k8s.io/klog/v2"
31
+ drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
31
32
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
32
33
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
33
34
)
@@ -62,10 +63,12 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
62
63
}
63
64
64
65
// PrepareResources attempts to prepare all of the required resource
65
- // plugin resources for the input container, issue an NodePrepareResource rpc request
66
+ // plugin resources for the input container, issue NodePrepareResources rpc requests
66
67
// for each new resource requirement, process their responses and update the cached
67
68
// containerResources on success.
68
69
func (m * ManagerImpl ) PrepareResources (pod * v1.Pod ) error {
70
+ batches := make (map [string ][]* drapb.Claim )
71
+ claimInfos := make (map [types.UID ]* ClaimInfo )
69
72
for i := range pod .Spec .ResourceClaims {
70
73
podClaim := & pod .Spec .ResourceClaims [i ]
71
74
klog .V (3 ).InfoS ("Processing resource" , "podClaim" , podClaim .Name , "pod" , pod .Name )
@@ -139,56 +142,79 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
139
142
sets .New (string (pod .UID )),
140
143
)
141
144
142
- // Walk through each resourceHandle
145
+ // Loop through all plugins and prepare for calling NodePrepareResources.
143
146
for _ , resourceHandle := range resourceHandles {
144
147
// If no DriverName is provided in the resourceHandle, we
145
148
// use the DriverName from the status
146
149
pluginName := resourceHandle .DriverName
147
150
if pluginName == "" {
148
151
pluginName = resourceClaim .Status .DriverName
149
152
}
153
+ claim := & drapb.Claim {
154
+ Namespace : resourceClaim .Namespace ,
155
+ Uid : string (resourceClaim .UID ),
156
+ Name : resourceClaim .Name ,
157
+ ResourceHandle : resourceHandle .Data ,
158
+ }
159
+ batches [pluginName ] = append (batches [pluginName ], claim )
160
+ }
161
+ claimInfos [resourceClaim .UID ] = claimInfo
162
+ }
150
163
151
- // Call NodePrepareResource RPC for each resourceHandle
152
- client , err := dra .NewDRAPluginClient (pluginName )
153
- if err != nil {
154
- return fmt .Errorf ("failed to get DRA Plugin client for plugin name %s, err=%+v" , pluginName , err )
164
+ // Call NodePrepareResources for all claims in each batch.
165
+ // If there is any error, processing gets aborted.
166
+ // We could try to continue, but that would make the code more complex.
167
+ for pluginName , claims := range batches {
168
+ // Call NodePrepareResources RPC for all resource handles.
169
+ client , err := dra .NewDRAPluginClient (pluginName )
170
+ if err != nil {
171
+ return fmt .Errorf ("failed to get DRA Plugin client for plugin name %s: %v" , pluginName , err )
172
+ }
173
+ response , err := client .NodePrepareResources (context .Background (), & drapb.NodePrepareResourcesRequest {Claims : claims })
174
+ if err != nil {
175
+ // General error unrelated to any particular claim.
176
+ return fmt .Errorf ("NodePrepareResources failed: %v" , err )
177
+ }
178
+ for claimUID , result := range response .Claims {
179
+ reqClaim := lookupClaimRequest (claims , claimUID )
180
+ if reqClaim == nil {
181
+ return fmt .Errorf ("NodePrepareResources returned result for unknown claim UID %s" , claimUID )
155
182
}
156
- response , err := client .NodePrepareResource (
157
- context .Background (),
158
- resourceClaim .Namespace ,
159
- resourceClaim .UID ,
160
- resourceClaim .Name ,
161
- resourceHandle .Data )
162
- if err != nil {
163
- return fmt .Errorf ("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v" ,
164
- resourceClaim .UID , resourceClaim .Name , resourceHandle .Data , err )
183
+ if result .Error != "" {
184
+ return fmt .Errorf ("NodePrepareResources failed for claim %s/%s: %s" , reqClaim .Namespace , reqClaim .Name , result .Error )
165
185
}
166
- klog .V (3 ).InfoS ("NodePrepareResource succeeded" , "pluginName" , pluginName , "response" , response )
167
186
168
- // Add the CDI Devices returned by NodePrepareResource to
187
+ claimInfo := claimInfos [types .UID (claimUID )]
188
+
189
+ // Add the CDI Devices returned by NodePrepareResources to
169
190
// the claimInfo object.
170
- err = claimInfo .addCDIDevices (pluginName , response . CdiDevices )
191
+ err = claimInfo .addCDIDevices (pluginName , result . CDIDevices )
171
192
if err != nil {
172
193
return fmt .Errorf ("failed to add CDIDevices to claimInfo %+v: %+v" , claimInfo , err )
173
194
}
174
195
175
196
// TODO: We (re)add the claimInfo object to the cache and
176
197
// sync it to the checkpoint *after* the
177
- // NodePrepareResource call has completed. This will cause
198
+ // NodePrepareResources call has completed. This will cause
178
199
// issues if the kubelet gets restarted between
179
- // NodePrepareResource and syncToCheckpoint. It will result
180
- // in not calling NodeUnprepareResource for this claim
200
+ // NodePrepareResources and syncToCheckpoint. It will result
201
+ // in not calling NodeUnprepareResources for this claim
181
202
// because no claimInfo will be synced back to the cache
182
203
// for it after the restart. We need to resolve this issue
183
204
// before moving to beta.
184
205
m .cache .add (claimInfo )
206
+ }
185
207
186
- // Checkpoint to reduce redundant calls to
187
- // NodePrepareResource() after a kubelet restart.
188
- err = m .cache .syncToCheckpoint ()
189
- if err != nil {
190
- return fmt .Errorf ("failed to checkpoint claimInfo state, err: %+v" , err )
191
- }
208
+ // Checkpoint to reduce redundant calls to
209
+ // NodePrepareResources after a kubelet restart.
210
+ err = m .cache .syncToCheckpoint ()
211
+ if err != nil {
212
+ return fmt .Errorf ("failed to checkpoint claimInfo state, err: %+v" , err )
213
+ }
214
+
215
+ unfinished := len (claims ) - len (response .Claims )
216
+ if unfinished != 0 {
217
+ return fmt .Errorf ("NodePrepareResources left out %d claims" , unfinished )
192
218
}
193
219
}
194
220
// Checkpoint to capture all of the previous addPodReference() calls.
@@ -199,6 +225,15 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
199
225
return nil
200
226
}
201
227
228
+ func lookupClaimRequest (claims []* drapb.Claim , claimUID string ) * drapb.Claim {
229
+ for _ , claim := range claims {
230
+ if claim .Uid == claimUID {
231
+ return claim
232
+ }
233
+ }
234
+ return nil
235
+ }
236
+
202
237
func claimIsUsedByPod (podClaim * v1.PodResourceClaim , pod * v1.Pod ) bool {
203
238
if claimIsUsedByContainers (podClaim , pod .Spec .InitContainers ) {
204
239
return true
@@ -274,7 +309,8 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
274
309
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
275
310
// already been successfully unprepared.
276
311
func (m * ManagerImpl ) UnprepareResources (pod * v1.Pod ) error {
277
- // Call NodeUnprepareResource RPC for every resource claim referenced by the pod
312
+ batches := make (map [string ][]* drapb.Claim )
313
+ claimInfos := make (map [types.UID ]* ClaimInfo )
278
314
for i := range pod .Spec .ResourceClaims {
279
315
claimName , _ , err := resourceclaim .Name (pod , & pod .Spec .ResourceClaims [i ])
280
316
if err != nil {
@@ -324,8 +360,7 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
324
360
resourceHandles = make ([]resourcev1alpha2.ResourceHandle , 1 )
325
361
}
326
362
327
- // Loop through all plugins and call NodeUnprepareResource only for the
328
- // last pod that references the claim
363
+ // Loop through all plugins and prepare for calling NodeUnprepareResources.
329
364
for _ , resourceHandle := range resourceHandles {
330
365
// If no DriverName is provided in the resourceHandle, we
331
366
// use the DriverName from the status
@@ -334,38 +369,62 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
334
369
pluginName = claimInfo .DriverName
335
370
}
336
371
337
- // Call NodeUnprepareResource RPC for each resourceHandle
338
- client , err := dra .NewDRAPluginClient (pluginName )
339
- if err != nil {
340
- return fmt .Errorf ("failed to get DRA Plugin client for plugin name %s, err=%+v" , pluginName , err )
341
- }
342
- response , err := client .NodeUnprepareResource (
343
- context .Background (),
344
- claimInfo .Namespace ,
345
- claimInfo .ClaimUID ,
346
- claimInfo .ClaimName ,
347
- resourceHandle .Data )
348
- if err != nil {
349
- return fmt .Errorf (
350
- "NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, resource handle: %s, err: %+v" ,
351
- pod .Name , claimInfo .ClaimUID , claimInfo .ClaimName , resourceHandle .Data , err )
372
+ claim := & drapb.Claim {
373
+ Namespace : resourceClaim .Namespace ,
374
+ Uid : string (resourceClaim .UID ),
375
+ Name : resourceClaim .Name ,
376
+ ResourceHandle : resourceHandle .Data ,
352
377
}
353
- klog .V (3 ).InfoS ("NodeUnprepareResource succeeded" , "response" , response )
378
+ batches [pluginName ] = append (batches [pluginName ], claim )
379
+ }
380
+ claimInfos [resourceClaim .UID ] = claimInfo
381
+ }
382
+
383
+ // Call NodeUnprepareResources for all claims in each batch.
384
+ // If there is any error, processing gets aborted.
385
+ // We could try to continue, but that would make the code more complex.
386
+ for pluginName , claims := range batches {
387
+ // Call NodeUnprepareResources RPC for all resource handles.
388
+ client , err := dra .NewDRAPluginClient (pluginName )
389
+ if err != nil {
390
+ return fmt .Errorf ("failed to get DRA Plugin client for plugin name %s: %v" , pluginName , err )
354
391
}
392
+ response , err := client .NodeUnprepareResources (context .Background (), & drapb.NodeUnprepareResourcesRequest {Claims : claims })
393
+ if err != nil {
394
+ // General error unrelated to any particular claim.
395
+ return fmt .Errorf ("NodeUnprepareResources failed: %v" , err )
396
+ }
397
+
398
+ for claimUID , result := range response .Claims {
399
+ reqClaim := lookupClaimRequest (claims , claimUID )
400
+ if reqClaim == nil {
401
+ return fmt .Errorf ("NodeUnprepareResources returned result for unknown claim UID %s" , claimUID )
402
+ }
403
+ if result .Error != "" {
404
+ return fmt .Errorf ("NodeUnprepareResources failed for claim %s/%s: %s" , reqClaim .Namespace , reqClaim .Name , err )
405
+ }
355
406
356
- // Delete last pod UID only if all NodeUnprepareResource calls succeed.
357
- // This ensures that the status manager doesn't enter termination status
358
- // for the pod. This logic is implemented in
359
- // m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
360
- claimInfo .deletePodReference (pod .UID )
361
- m .cache .delete (claimInfo .ClaimName , pod .Namespace )
407
+ // Delete last pod UID only if unprepare succeeds.
408
+ // This ensures that the status manager doesn't enter termination status
409
+ // for the pod. This logic is implemented in
410
+ // m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
411
+ claimInfo := claimInfos [types .UID (claimUID )]
412
+ claimInfo .deletePodReference (pod .UID )
413
+ m .cache .delete (claimInfo .ClaimName , pod .Namespace )
414
+ }
362
415
363
- // Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
416
+ // Checkpoint to reduce redundant calls to NodeUnprepareResources after a kubelet restart.
364
417
err = m .cache .syncToCheckpoint ()
365
418
if err != nil {
366
419
return fmt .Errorf ("failed to checkpoint claimInfo state, err: %+v" , err )
367
420
}
421
+
422
+ unfinished := len (claims ) - len (response .Claims )
423
+ if unfinished != 0 {
424
+ return fmt .Errorf ("NodeUnprepareResources left out %d claims" , unfinished )
425
+ }
368
426
}
427
+
369
428
// Checkpoint to capture all of the previous deletePodReference() calls.
370
429
err := m .cache .syncToCheckpoint ()
371
430
if err != nil {
0 commit comments