Skip to content

kubelet: support batched prepare/unprepare in v1alpha3 DRA plugin API #119012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 112 additions & 53 deletions pkg/kubelet/cm/dra/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
Expand Down Expand Up @@ -62,10 +63,12 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
}

// PrepareResources attempts to prepare all of the required resource
// plugin resources for the input container, issue an NodePrepareResource rpc request
// plugin resources for the input container, issue NodePrepareResources rpc requests
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
batches := make(map[string][]*drapb.Claim)
claimInfos := make(map[types.UID]*ClaimInfo)
for i := range pod.Spec.ResourceClaims {
podClaim := &pod.Spec.ResourceClaims[i]
claimName := resourceclaim.Name(pod, podClaim)
Expand Down Expand Up @@ -126,56 +129,79 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
sets.New(string(pod.UID)),
)

// Walk through each resourceHandle
// Loop through all plugins and prepare for calling NodePrepareResources.
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
if pluginName == "" {
pluginName = resourceClaim.Status.DriverName
}
claim := &drapb.Claim{
Namespace: resourceClaim.Namespace,
Uid: string(resourceClaim.UID),
Name: resourceClaim.Name,
ResourceHandle: resourceHandle.Data,
}
batches[pluginName] = append(batches[pluginName], claim)
}
claimInfos[resourceClaim.UID] = claimInfo
}

// Call NodePrepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
// Call NodePrepareResources for all claims in each batch.
// If there is any error, processing gets aborted.
// We could try to continue, but that would make the code more complex.
for pluginName, claims := range batches {
// Call NodePrepareResources RPC for all resource handles.
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
}
response, err := client.NodePrepareResources(context.Background(), &drapb.NodePrepareResourcesRequest{Claims: claims})
if err != nil {
// General error unrelated to any particular claim.
return fmt.Errorf("NodePrepareResources failed: %v", err)
}
for claimUID, result := range response.Claims {
reqClaim := lookupClaimRequest(claims, claimUID)
if reqClaim == nil {
return fmt.Errorf("NodePrepareResources returned result for unknown claim UID %s", claimUID)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceHandle.Data)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
if result.Error != "" {
return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)

// Add the CDI Devices returned by NodePrepareResource to
claimInfo := claimInfos[types.UID(claimUID)]

// Add the CDI Devices returned by NodePrepareResources to
// the claimInfo object.
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
err = claimInfo.addCDIDevices(pluginName, result.CDIDevices)
if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
}

// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause
// NodePrepareResources call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim
// NodePrepareResources and syncToCheckpoint. It will result
// in not calling NodeUnprepareResources for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)
}

// Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
// Checkpoint to reduce redundant calls to
// NodePrepareResources after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}

unfinished := len(claims) - len(response.Claims)
if unfinished != 0 {
return fmt.Errorf("NodePrepareResources left out %d claims", unfinished)
}
}
// Checkpoint to capture all of the previous addPodReference() calls.
Expand All @@ -186,6 +212,15 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
return nil
}

func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
for _, claim := range claims {
if claim.Uid == claimUID {
return claim
}
}
return nil
}

func claimIsUsedByPod(podClaim *v1.PodResourceClaim, pod *v1.Pod) bool {
if claimIsUsedByContainers(podClaim, pod.Spec.InitContainers) {
return true
Expand Down Expand Up @@ -253,7 +288,8 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
// already been successfully unprepared.
func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
// Call NodeUnprepareResource RPC for every resource claim referenced by the pod
batches := make(map[string][]*drapb.Claim)
claimInfos := make(map[types.UID]*ClaimInfo)
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
claimInfo := m.cache.get(claimName, pod.Namespace)
Expand Down Expand Up @@ -292,8 +328,7 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}

// Loop through all plugins and call NodeUnprepareResource only for the
// last pod that references the claim
// Loop through all plugins and prepare for calling NodeUnprepareResources.
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
Expand All @@ -302,38 +337,62 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
pluginName = claimInfo.DriverName
}

// Call NodeUnprepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
}
response, err := client.NodeUnprepareResource(
context.Background(),
claimInfo.Namespace,
claimInfo.ClaimUID,
claimInfo.ClaimName,
resourceHandle.Data)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, resourceHandle.Data, err)
claim := &drapb.Claim{
Namespace: resourceClaim.Namespace,
Uid: string(resourceClaim.UID),
Name: resourceClaim.Name,
ResourceHandle: resourceHandle.Data,
}
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
batches[pluginName] = append(batches[pluginName], claim)
}
claimInfos[resourceClaim.UID] = claimInfo
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment that begins this loop is now incorrect as we don't call NodeUnprepareResource() from within this loop anymore. That comment should be moved below to the new loop with:

for pluginName, claims := range batches {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please double check with the PrepareResources() call and try and make the comments consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "only for the last pod that references the claim" seemed redundant to me. I dropped it in the updated comments.


// Call NodeUnprepareResources for all claims in each batch.
// If there is any error, processing gets aborted.
// We could try to continue, but that would make the code more complex.
for pluginName, claims := range batches {
// Call NodeUnprepareResources RPC for all resource handles.
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
}
response, err := client.NodeUnprepareResources(context.Background(), &drapb.NodeUnprepareResourcesRequest{Claims: claims})
if err != nil {
// General error unrelated to any particular claim.
return fmt.Errorf("NodeUnprepareResources failed: %v", err)
}

for claimUID, result := range response.Claims {
reqClaim := lookupClaimRequest(claims, claimUID)
if reqClaim == nil {
return fmt.Errorf("NodeUnprepareResources returned result for unknown claim UID %s", claimUID)
}
if result.Error != "" {
return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, err)
}

// Delete last pod UID only if all NodeUnprepareResource calls succeed.
// This ensures that the status manager doesn't enter termination status
// for the pod. This logic is implemented in
// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
claimInfo.deletePodReference(pod.UID)
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
// Delete last pod UID only if unprepare succeeds.
// This ensures that the status manager doesn't enter termination status
// for the pod. This logic is implemented in
// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
claimInfo := claimInfos[types.UID(claimUID)]
claimInfo.deletePodReference(pod.UID)
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
}

// Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
// Checkpoint to reduce redundant calls to NodeUnprepareResources after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}

unfinished := len(claims) - len(response.Claims)
if unfinished != 0 {
return fmt.Errorf("NodeUnprepareResources left out %d claims", unfinished)
}
}

// Checkpoint to capture all of the previous deletePodReference() calls.
err := m.cache.syncToCheckpoint()
if err != nil {
Expand Down
Loading