@@ -27,7 +27,6 @@ import (
27
27
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
28
28
apierrors "k8s.io/apimachinery/pkg/api/errors"
29
29
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
- "k8s.io/apimachinery/pkg/labels"
31
30
"k8s.io/apimachinery/pkg/types"
32
31
"k8s.io/apimachinery/pkg/util/runtime"
33
32
"k8s.io/apimachinery/pkg/util/wait"
@@ -59,6 +58,10 @@ const (
59
58
// and not documented as part of the Kubernetes API.
60
59
podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name"
61
60
61
+ // claimPodOwnerIndex is used to find ResourceClaims which have
62
+ // a specific pod as owner. Values for this index are the pod UID.
63
+ claimPodOwnerIndex = "claim-pod-owner-index"
64
+
62
65
// Field manager used to update the pod status.
63
66
fieldManager = "ResourceClaimController"
64
67
@@ -76,6 +79,7 @@ type Controller struct {
76
79
// therefore the ResourceClaim objects in its store should be treated as immutable.
77
80
claimLister resourcev1alpha2listers.ResourceClaimLister
78
81
claimsSynced cache.InformerSynced
82
+ claimCache cache.MutationCache
79
83
80
84
// podLister is the shared Pod lister used to fetch Pod
81
85
// objects from the API server. It is shared with other controllers and
@@ -163,6 +167,28 @@ func NewController(
163
167
return nil , fmt .Errorf ("could not initialize ResourceClaim controller: %w" , err )
164
168
}
165
169
170
+ // The mutation cache acts as an additional layer for the informer
171
+ // cache and after a create made by the controller returns that
172
+ // object until the informer catches up. That is necessary
173
+ // when a ResourceClaim got created, updating the pod status fails,
174
+ // and then a retry occurs before the informer cache is updated.
175
+ // In that scenario, the controller would create another claim
176
+ // instead of continuing with the existing one.
177
+ claimInformerCache := claimInformer .Informer ().GetIndexer ()
178
+ if err := claimInformerCache .AddIndexers (cache.Indexers {claimPodOwnerIndex : claimPodOwnerIndexFunc }); err != nil {
179
+ return nil , fmt .Errorf ("could not initialize ResourceClaim controller: %w" , err )
180
+ }
181
+ ec .claimCache = cache .NewIntegerResourceVersionMutationCache (claimInformerCache , claimInformerCache ,
182
+ // Very long time to live, unlikely to be needed because
183
+ // the informer cache should get updated soon.
184
+ time .Hour ,
185
+ // Allow storing objects not in the underlying cache - that's the point...
186
+ // It's safe because in case of a race (claim is in mutation cache, claim
187
+ // gets deleted, controller updates status based on mutation cache) the
188
+ // "bad" pod status will get detected and fixed when the informer catches up.
189
+ true ,
190
+ )
191
+
166
192
return ec , nil
167
193
}
168
194
@@ -487,6 +513,7 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.
487
513
metrics .ResourceClaimCreateFailures .Inc ()
488
514
return fmt .Errorf ("create ResourceClaim %s: %v" , claimName , err )
489
515
}
516
+ ec .claimCache .Mutation (claim )
490
517
}
491
518
492
519
// Remember the new ResourceClaim for a batch PodStatus update in our caller.
@@ -502,14 +529,16 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.
502
529
// annotation (ties it to the pod claim) and the right ownership (ties it to
503
530
// the pod).
504
531
func (ec * Controller ) findPodResourceClaim (pod * v1.Pod , podClaim v1.PodResourceClaim ) (* resourcev1alpha2.ResourceClaim , error ) {
505
- claims , err := ec .claimLister .List (labels .Everything ())
532
+ // Only claims owned by the pod will get returned here.
533
+ claims , err := ec .claimCache .ByIndex (claimPodOwnerIndex , string (pod .UID ))
506
534
if err != nil {
507
535
return nil , err
508
536
}
509
537
deterministicName := pod .Name + "-" + podClaim .Name // Kubernetes <= 1.27 behavior.
510
- for _ , claim := range claims {
511
- if err := resourceclaim .IsForPod (pod , claim ); err != nil {
512
- continue
538
+ for _ , claimObj := range claims {
539
+ claim , ok := claimObj .(* resourcev1alpha2.ResourceClaim )
540
+ if ! ok {
541
+ return nil , fmt .Errorf ("unexpected object of type %T returned by claim cache" , claimObj )
513
542
}
514
543
podClaimName , ok := claim .Annotations [podResourceClaimAnnotation ]
515
544
if ok && podClaimName != podClaim .Name {
@@ -715,3 +744,22 @@ func isPodDone(pod *v1.Pod) bool {
715
744
// Deleted and not scheduled:
716
745
pod .DeletionTimestamp != nil && pod .Spec .NodeName == ""
717
746
}
747
+
748
+ // claimPodOwnerIndexFunc is an index function that returns the pod UIDs of
749
+ // all pods which own the resource claim. Should only be one, though.
750
+ func claimPodOwnerIndexFunc (obj interface {}) ([]string , error ) {
751
+ claim , ok := obj .(* resourcev1alpha2.ResourceClaim )
752
+ if ! ok {
753
+ return nil , nil
754
+ }
755
+ var keys []string
756
+ for _ , owner := range claim .OwnerReferences {
757
+ if owner .Controller != nil &&
758
+ * owner .Controller &&
759
+ owner .APIVersion == "v1" &&
760
+ owner .Kind == "Pod" {
761
+ keys = append (keys , string (owner .UID ))
762
+ }
763
+ }
764
+ return keys , nil
765
+ }
0 commit comments