Skip to content

Commit f51dad5

Browse files
authored
Merge pull request #94021 from timoreimann/support-specifying-custom-lb-retry-period-from-cloud-provider
Support specifying custom LB retry period from cloud provider
2 parents 19830bf + 2ad2c15 commit f51dad5

File tree

4 files changed

+200
-22
lines changed

4 files changed

+200
-22
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"time"
21+
)
22+
23+
// RetryError indicates that a service reconciliation should be retried after a
24+
// fixed duration (as opposed to backing off exponentially).
25+
type RetryError struct {
26+
msg string
27+
retryAfter time.Duration
28+
}
29+
30+
// NewRetryError returns a RetryError.
31+
func NewRetryError(msg string, retryAfter time.Duration) *RetryError {
32+
return &RetryError{
33+
msg: msg,
34+
retryAfter: retryAfter,
35+
}
36+
}
37+
38+
// Error shows the details of the retry reason.
39+
func (re *RetryError) Error() string {
40+
return re.msg
41+
}
42+
43+
// RetryAfter returns the defined retry-after duration.
44+
func (re *RetryError) RetryAfter() time.Duration {
45+
return re.retryAfter
46+
}

staging/src/k8s.io/cloud-provider/cloud.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,25 @@ func GetInstanceProviderID(ctx context.Context, cloud Interface, nodeName types.
131131
// irrespective of the ImplementedElsewhere error. Additional finalizers for
132132
// LB services must be managed in the alternate implementation.
133133
type LoadBalancer interface {
134-
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
135134
// GetLoadBalancer returns whether the specified load balancer exists, and
136135
// if so, what its status is.
137136
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
138-
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
137+
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager.
138+
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
139139
GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
140140
// GetLoadBalancerName returns the name of the load balancer. Implementations must treat the
141141
// *v1.Service parameter as read-only and not modify it.
142142
GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string
143143
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
144144
// Implementations must treat the *v1.Service and *v1.Node
145145
// parameters as read-only and not modify them.
146-
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
146+
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager.
147+
//
148+
// Implementations may return a (possibly wrapped) api.RetryError to enforce
149+
// backing off at a fixed duration. This can be used for cases like when the
150+
// load balancer is not ready yet (e.g., it is still being provisioned) and
151+
// polling at a fixed rate is preferred over backing off exponentially in
152+
// order to minimize latency.
147153
EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
148154
// UpdateLoadBalancer updates hosts under the specified load balancer.
149155
// Implementations must treat the *v1.Service and *v1.Node

staging/src/k8s.io/cloud-provider/controllers/service/controller.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ package service
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"reflect"
2324
"sync"
2425
"time"
2526

2627
v1 "k8s.io/api/core/v1"
27-
"k8s.io/apimachinery/pkg/api/errors"
28+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2829
"k8s.io/apimachinery/pkg/labels"
2930
"k8s.io/apimachinery/pkg/util/runtime"
3031
"k8s.io/apimachinery/pkg/util/sets"
@@ -39,6 +40,7 @@ import (
3940
"k8s.io/client-go/tools/record"
4041
"k8s.io/client-go/util/workqueue"
4142
cloudprovider "k8s.io/cloud-provider"
43+
"k8s.io/cloud-provider/api"
4244
servicehelper "k8s.io/cloud-provider/service/helpers"
4345
"k8s.io/component-base/featuregate"
4446
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
@@ -288,8 +290,15 @@ func (c *Controller) processNextServiceItem(ctx context.Context) bool {
288290
return true
289291
}
290292

291-
runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
292-
c.serviceQueue.AddRateLimited(key)
293+
var re *api.RetryError
294+
if errors.As(err, &re) {
295+
klog.Warningf("error processing service %v (retrying in %s): %v", key, re.RetryAfter(), err)
296+
c.serviceQueue.AddAfter(key, re.RetryAfter())
297+
} else {
298+
runtime.HandleError(fmt.Errorf("error processing service %v (retrying with exponential backoff): %v", key, err))
299+
c.serviceQueue.AddRateLimited(key)
300+
}
301+
293302
return true
294303
}
295304

@@ -401,7 +410,8 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
401410
klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, c.cloud.ProviderName())
402411
return op, nil
403412
}
404-
return op, fmt.Errorf("failed to ensure load balancer: %v", err)
413+
// Use %w deliberately so that a returned RetryError can be handled.
414+
return op, fmt.Errorf("failed to ensure load balancer: %w", err)
405415
}
406416
if newStatus == nil {
407417
return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil")
@@ -415,7 +425,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S
415425
// - Not found error mostly happens when service disappears right after
416426
// we remove the finalizer.
417427
// - We can't patch status on non-exist service anyway.
418-
if !errors.IsNotFound(err) {
428+
if !apierrors.IsNotFound(err) {
419429
return op, fmt.Errorf("failed to update load balancer status: %v", err)
420430
}
421431
}
@@ -837,7 +847,7 @@ func (c *Controller) syncService(ctx context.Context, key string) error {
837847
// service holds the latest service info from apiserver
838848
service, err := c.serviceLister.Services(namespace).Get(name)
839849
switch {
840-
case errors.IsNotFound(err):
850+
case apierrors.IsNotFound(err):
841851
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
842852
err = c.processServiceDeletion(ctx, key)
843853
case err != nil:

staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go

Lines changed: 129 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ import (
4141
"k8s.io/client-go/kubernetes/fake"
4242
"k8s.io/client-go/kubernetes/scheme"
4343
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
44+
corelisters "k8s.io/client-go/listers/core/v1"
4445
core "k8s.io/client-go/testing"
46+
"k8s.io/client-go/tools/cache"
4547
"k8s.io/client-go/tools/record"
4648
"k8s.io/client-go/util/workqueue"
49+
"k8s.io/cloud-provider/api"
4750
fakecloud "k8s.io/cloud-provider/fake"
4851
servicehelper "k8s.io/cloud-provider/service/helpers"
4952
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -1093,22 +1096,24 @@ func TestSyncService(t *testing.T) {
10931096
}
10941097

10951098
for _, tc := range testCases {
1096-
ctx, cancel := context.WithCancel(context.Background())
1097-
defer cancel()
1099+
t.Run(tc.testName, func(t *testing.T) {
1100+
ctx, cancel := context.WithCancel(context.Background())
1101+
defer cancel()
10981102

1099-
tc.updateFn()
1100-
obtainedErr := controller.syncService(ctx, tc.key)
1103+
tc.updateFn()
1104+
obtainedErr := controller.syncService(ctx, tc.key)
11011105

1102-
//expected matches obtained ??.
1103-
if exp := tc.expectedFn(obtainedErr); exp != nil {
1104-
t.Errorf("%v Error:%v", tc.testName, exp)
1105-
}
1106+
//expected matches obtained ??.
1107+
if exp := tc.expectedFn(obtainedErr); exp != nil {
1108+
t.Errorf("%v Error:%v", tc.testName, exp)
1109+
}
11061110

1107-
//Post processing, the element should not be in the sync queue.
1108-
_, exist := controller.cache.get(tc.key)
1109-
if exist {
1110-
t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key)
1111-
}
1111+
//Post processing, the element should not be in the sync queue.
1112+
_, exist := controller.cache.get(tc.key)
1113+
if exist {
1114+
t.Fatalf("%v working Queue should be empty, but contains %s", tc.testName, tc.key)
1115+
}
1116+
})
11121117
}
11131118
}
11141119

@@ -2253,6 +2258,87 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
22532258
}
22542259
}
22552260

2261+
func TestServiceQueueDelay(t *testing.T) {
2262+
const ns = metav1.NamespaceDefault
2263+
2264+
tests := []struct {
2265+
name string
2266+
lbCloudErr error
2267+
wantRetryDelay time.Duration
2268+
}{
2269+
{
2270+
name: "processing successful",
2271+
lbCloudErr: nil,
2272+
},
2273+
{
2274+
name: "regular error",
2275+
lbCloudErr: errors.New("something went wrong"),
2276+
},
2277+
{
2278+
name: "retry error",
2279+
lbCloudErr: api.NewRetryError("LB create in progress", 42*time.Second),
2280+
wantRetryDelay: 42 * time.Second,
2281+
},
2282+
}
2283+
2284+
for _, tc := range tests {
2285+
t.Run(tc.name, func(t *testing.T) {
2286+
controller, cloud, client := newController()
2287+
queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")}
2288+
controller.serviceQueue = queue
2289+
cloud.Err = tc.lbCloudErr
2290+
2291+
serviceCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
2292+
controller.serviceLister = corelisters.NewServiceLister(serviceCache)
2293+
2294+
svc := defaultExternalService()
2295+
if err := serviceCache.Add(svc); err != nil {
2296+
t.Fatalf("adding service %s to cache: %s", svc.Name, err)
2297+
}
2298+
2299+
ctx := context.Background()
2300+
_, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
2301+
if err != nil {
2302+
t.Fatal(err)
2303+
}
2304+
2305+
key, err := cache.MetaNamespaceKeyFunc(svc)
2306+
if err != nil {
2307+
t.Fatalf("creating meta namespace key: %s", err)
2308+
}
2309+
queue.Add(key)
2310+
2311+
done := controller.processNextServiceItem(ctx)
2312+
if !done {
2313+
t.Fatal("processNextServiceItem stopped prematurely")
2314+
}
2315+
2316+
// Expect no requeues unless we hit an error that is not a retry
2317+
// error.
2318+
wantNumRequeues := 0
2319+
var re *api.RetryError
2320+
isRetryError := errors.As(tc.lbCloudErr, &re)
2321+
if tc.lbCloudErr != nil && !isRetryError {
2322+
wantNumRequeues = 1
2323+
}
2324+
2325+
if gotNumRequeues := queue.NumRequeues(key); gotNumRequeues != wantNumRequeues {
2326+
t.Fatalf("got %d requeue(s), want %d", gotNumRequeues, wantNumRequeues)
2327+
}
2328+
2329+
if tc.wantRetryDelay > 0 {
2330+
items := queue.getItems()
2331+
if len(items) != 1 {
2332+
t.Fatalf("got %d item(s), want 1", len(items))
2333+
}
2334+
if gotDelay := items[0].Delay; gotDelay != tc.wantRetryDelay {
2335+
t.Fatalf("got delay %s, want %s", gotDelay, tc.wantRetryDelay)
2336+
}
2337+
}
2338+
})
2339+
}
2340+
}
2341+
22562342
type fakeNodeLister struct {
22572343
cache []*v1.Node
22582344
err error
@@ -2281,3 +2367,33 @@ func (l *fakeNodeLister) Get(name string) (*v1.Node, error) {
22812367
}
22822368
return nil, nil
22832369
}
2370+
2371+
// spyWorkQueue implements a work queue and adds the ability to inspect processed
2372+
// items for testing purposes.
2373+
type spyWorkQueue struct {
2374+
workqueue.RateLimitingInterface
2375+
items []spyQueueItem
2376+
}
2377+
2378+
// spyQueueItem represents an item that was being processed.
2379+
type spyQueueItem struct {
2380+
Key interface{}
2381+
// Delay represents the delayed duration if and only if AddAfter was invoked.
2382+
Delay time.Duration
2383+
}
2384+
2385+
// AddAfter is like workqueue.RateLimitingInterface.AddAfter but records the
2386+
// added key and delay internally.
2387+
func (f *spyWorkQueue) AddAfter(key interface{}, delay time.Duration) {
2388+
f.items = append(f.items, spyQueueItem{
2389+
Key: key,
2390+
Delay: delay,
2391+
})
2392+
2393+
f.RateLimitingInterface.AddAfter(key, delay)
2394+
}
2395+
2396+
// getItems returns all items that were recorded.
2397+
func (f *spyWorkQueue) getItems() []spyQueueItem {
2398+
return f.items
2399+
}

0 commit comments

Comments
 (0)