Skip to content

Commit fec2578

Browse files
committed
dra: store generated ResourceClaims in cache
This addresses the following bad sequence of events: - controller creates ResourceClaim - updating pod status fails - pod gets retried before the informer receives the created ResourceClaim - another ResourceClaim gets created Storing the generated ResourceClaim in a MutationCache ensures that the controller knows about it during the retry. A positive side effect is that ResourceClaims now get index by pod owner and thus iterating over existing ones becomes a bit more efficient.
1 parent ba81087 commit fec2578

File tree

2 files changed

+71
-5
lines changed

2 files changed

+71
-5
lines changed

pkg/controller/resourceclaim/controller.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
2828
apierrors "k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30-
"k8s.io/apimachinery/pkg/labels"
3130
"k8s.io/apimachinery/pkg/types"
3231
"k8s.io/apimachinery/pkg/util/runtime"
3332
"k8s.io/apimachinery/pkg/util/wait"
@@ -59,6 +58,10 @@ const (
5958
// and not documented as part of the Kubernetes API.
6059
podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name"
6160

61+
// claimPodOwnerIndex is used to find ResourceClaims which have
62+
// a specific pod as owner. Values for this index are the pod UID.
63+
claimPodOwnerIndex = "claim-pod-owner-index"
64+
6265
// Field manager used to update the pod status.
6366
fieldManager = "ResourceClaimController"
6467

@@ -76,6 +79,7 @@ type Controller struct {
7679
// therefore the ResourceClaim objects in its store should be treated as immutable.
7780
claimLister resourcev1alpha2listers.ResourceClaimLister
7881
claimsSynced cache.InformerSynced
82+
claimCache cache.MutationCache
7983

8084
// podLister is the shared Pod lister used to fetch Pod
8185
// objects from the API server. It is shared with other controllers and
@@ -163,6 +167,28 @@ func NewController(
163167
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
164168
}
165169

170+
// The mutation cache acts as an additional layer for the informer
171+
// cache and after a create made by the controller returns that
172+
// object until the informer catches up. That is necessary
173+
// when a ResourceClaim got created, updating the pod status fails,
174+
// and then a retry occurs before the informer cache is updated.
175+
// In that scenario, the controller would create another claim
176+
// instead of continuing with the existing one.
177+
claimInformerCache := claimInformer.Informer().GetIndexer()
178+
if err := claimInformerCache.AddIndexers(cache.Indexers{claimPodOwnerIndex: claimPodOwnerIndexFunc}); err != nil {
179+
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
180+
}
181+
ec.claimCache = cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache,
182+
// Very long time to live, unlikely to be needed because
183+
// the informer cache should get updated soon.
184+
time.Hour,
185+
// Allow storing objects not in the underlying cache - that's the point...
186+
// It's safe because in case of a race (claim is in mutation cache, claim
187+
// gets deleted, controller updates status based on mutation cache) the
188+
// "bad" pod status will get detected and fixed when the informer catches up.
189+
true,
190+
)
191+
166192
return ec, nil
167193
}
168194

@@ -487,6 +513,7 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.
487513
metrics.ResourceClaimCreateFailures.Inc()
488514
return fmt.Errorf("create ResourceClaim %s: %v", claimName, err)
489515
}
516+
ec.claimCache.Mutation(claim)
490517
}
491518

492519
// Remember the new ResourceClaim for a batch PodStatus update in our caller.
@@ -502,14 +529,16 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.
502529
// annotation (ties it to the pod claim) and the right ownership (ties it to
503530
// the pod).
504531
func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceClaim) (*resourcev1alpha2.ResourceClaim, error) {
505-
claims, err := ec.claimLister.List(labels.Everything())
532+
// Only claims owned by the pod will get returned here.
533+
claims, err := ec.claimCache.ByIndex(claimPodOwnerIndex, string(pod.UID))
506534
if err != nil {
507535
return nil, err
508536
}
509537
deterministicName := pod.Name + "-" + podClaim.Name // Kubernetes <= 1.27 behavior.
510-
for _, claim := range claims {
511-
if err := resourceclaim.IsForPod(pod, claim); err != nil {
512-
continue
538+
for _, claimObj := range claims {
539+
claim, ok := claimObj.(*resourcev1alpha2.ResourceClaim)
540+
if !ok {
541+
return nil, fmt.Errorf("unexpected object of type %T returned by claim cache", claimObj)
513542
}
514543
podClaimName, ok := claim.Annotations[podResourceClaimAnnotation]
515544
if ok && podClaimName != podClaim.Name {
@@ -715,3 +744,22 @@ func isPodDone(pod *v1.Pod) bool {
715744
// Deleted and not scheduled:
716745
pod.DeletionTimestamp != nil && pod.Spec.NodeName == ""
717746
}
747+
748+
// claimPodOwnerIndexFunc is an index function that returns the pod UIDs of
749+
// all pods which own the resource claim. Should only be one, though.
750+
func claimPodOwnerIndexFunc(obj interface{}) ([]string, error) {
751+
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
752+
if !ok {
753+
return nil, nil
754+
}
755+
var keys []string
756+
for _, owner := range claim.OwnerReferences {
757+
if owner.Controller != nil &&
758+
*owner.Controller &&
759+
owner.APIVersion == "v1" &&
760+
owner.Kind == "Pod" {
761+
keys = append(keys, string(owner.UID))
762+
}
763+
}
764+
return keys, nil
765+
}

pkg/controller/resourceclaim/controller_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func TestSyncHandler(t *testing.T) {
9292
name string
9393
key string
9494
claims []*resourcev1alpha2.ResourceClaim
95+
claimsInCache []*resourcev1alpha2.ResourceClaim
9596
pods []*v1.Pod
9697
podsLater []*v1.Pod
9798
templates []*resourcev1alpha2.ResourceClaimTemplate
@@ -185,6 +186,18 @@ func TestSyncHandler(t *testing.T) {
185186
},
186187
expectedMetrics: expectedMetrics{0, 0},
187188
},
189+
{
190+
name: "find-created-claim-in-cache",
191+
pods: []*v1.Pod{testPodWithResource},
192+
key: podKey(testPodWithResource),
193+
claimsInCache: []*resourcev1alpha2.ResourceClaim{generatedTestClaim},
194+
expectedStatuses: map[string][]v1.PodResourceClaimStatus{
195+
testPodWithResource.Name: {
196+
{Name: testPodWithResource.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name},
197+
},
198+
},
199+
expectedMetrics: expectedMetrics{0, 0},
200+
},
188201
{
189202
name: "no-such-pod",
190203
key: podKey(testPodWithResource),
@@ -345,6 +358,11 @@ func TestSyncHandler(t *testing.T) {
345358
informerFactory.WaitForCacheSync(ctx.Done())
346359
cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, claimInformer.Informer().HasSynced, templateInformer.Informer().HasSynced)
347360

361+
// Add claims that only exist in the mutation cache.
362+
for _, claim := range tc.claimsInCache {
363+
ec.claimCache.Mutation(claim)
364+
}
365+
348366
// Simulate race: stop informers, add more pods that the controller doesn't know about.
349367
stopInformers()
350368
for _, pod := range tc.podsLater {

0 commit comments

Comments
 (0)