@@ -41,9 +41,12 @@ import (
41
41
"k8s.io/client-go/kubernetes/fake"
42
42
"k8s.io/client-go/kubernetes/scheme"
43
43
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
44
+ corelisters "k8s.io/client-go/listers/core/v1"
44
45
core "k8s.io/client-go/testing"
46
+ "k8s.io/client-go/tools/cache"
45
47
"k8s.io/client-go/tools/record"
46
48
"k8s.io/client-go/util/workqueue"
49
+ "k8s.io/cloud-provider/api"
47
50
fakecloud "k8s.io/cloud-provider/fake"
48
51
servicehelper "k8s.io/cloud-provider/service/helpers"
49
52
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -1093,22 +1096,24 @@ func TestSyncService(t *testing.T) {
1093
1096
}
1094
1097
1095
1098
for _ , tc := range testCases {
1096
- ctx , cancel := context .WithCancel (context .Background ())
1097
- defer cancel ()
1099
+ t .Run (tc .testName , func (t * testing.T ) {
1100
+ ctx , cancel := context .WithCancel (context .Background ())
1101
+ defer cancel ()
1098
1102
1099
- tc .updateFn ()
1100
- obtainedErr := controller .syncService (ctx , tc .key )
1103
+ tc .updateFn ()
1104
+ obtainedErr := controller .syncService (ctx , tc .key )
1101
1105
1102
- //expected matches obtained ??.
1103
- if exp := tc .expectedFn (obtainedErr ); exp != nil {
1104
- t .Errorf ("%v Error:%v" , tc .testName , exp )
1105
- }
1106
+ //expected matches obtained ??.
1107
+ if exp := tc .expectedFn (obtainedErr ); exp != nil {
1108
+ t .Errorf ("%v Error:%v" , tc .testName , exp )
1109
+ }
1106
1110
1107
- //Post processing, the element should not be in the sync queue.
1108
- _ , exist := controller .cache .get (tc .key )
1109
- if exist {
1110
- t .Fatalf ("%v working Queue should be empty, but contains %s" , tc .testName , tc .key )
1111
- }
1111
+ //Post processing, the element should not be in the sync queue.
1112
+ _ , exist := controller .cache .get (tc .key )
1113
+ if exist {
1114
+ t .Fatalf ("%v working Queue should be empty, but contains %s" , tc .testName , tc .key )
1115
+ }
1116
+ })
1112
1117
}
1113
1118
}
1114
1119
@@ -2253,6 +2258,87 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) {
2253
2258
}
2254
2259
}
2255
2260
2261
+ func TestServiceQueueDelay (t * testing.T ) {
2262
+ const ns = metav1 .NamespaceDefault
2263
+
2264
+ tests := []struct {
2265
+ name string
2266
+ lbCloudErr error
2267
+ wantRetryDelay time.Duration
2268
+ }{
2269
+ {
2270
+ name : "processing successful" ,
2271
+ lbCloudErr : nil ,
2272
+ },
2273
+ {
2274
+ name : "regular error" ,
2275
+ lbCloudErr : errors .New ("something went wrong" ),
2276
+ },
2277
+ {
2278
+ name : "retry error" ,
2279
+ lbCloudErr : api .NewRetryError ("LB create in progress" , 42 * time .Second ),
2280
+ wantRetryDelay : 42 * time .Second ,
2281
+ },
2282
+ }
2283
+
2284
+ for _ , tc := range tests {
2285
+ t .Run (tc .name , func (t * testing.T ) {
2286
+ controller , cloud , client := newController ()
2287
+ queue := & spyWorkQueue {RateLimitingInterface : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "test-service-queue-delay" )}
2288
+ controller .serviceQueue = queue
2289
+ cloud .Err = tc .lbCloudErr
2290
+
2291
+ serviceCache := cache .NewIndexer (cache .MetaNamespaceKeyFunc , cache.Indexers {cache .NamespaceIndex : cache .MetaNamespaceIndexFunc })
2292
+ controller .serviceLister = corelisters .NewServiceLister (serviceCache )
2293
+
2294
+ svc := defaultExternalService ()
2295
+ if err := serviceCache .Add (svc ); err != nil {
2296
+ t .Fatalf ("adding service %s to cache: %s" , svc .Name , err )
2297
+ }
2298
+
2299
+ ctx := context .Background ()
2300
+ _ , err := client .CoreV1 ().Services (ns ).Create (ctx , svc , metav1.CreateOptions {})
2301
+ if err != nil {
2302
+ t .Fatal (err )
2303
+ }
2304
+
2305
+ key , err := cache .MetaNamespaceKeyFunc (svc )
2306
+ if err != nil {
2307
+ t .Fatalf ("creating meta namespace key: %s" , err )
2308
+ }
2309
+ queue .Add (key )
2310
+
2311
+ done := controller .processNextServiceItem (ctx )
2312
+ if ! done {
2313
+ t .Fatal ("processNextServiceItem stopped prematurely" )
2314
+ }
2315
+
2316
+ // Expect no requeues unless we hit an error that is not a retry
2317
+ // error.
2318
+ wantNumRequeues := 0
2319
+ var re * api.RetryError
2320
+ isRetryError := errors .As (tc .lbCloudErr , & re )
2321
+ if tc .lbCloudErr != nil && ! isRetryError {
2322
+ wantNumRequeues = 1
2323
+ }
2324
+
2325
+ if gotNumRequeues := queue .NumRequeues (key ); gotNumRequeues != wantNumRequeues {
2326
+ t .Fatalf ("got %d requeue(s), want %d" , gotNumRequeues , wantNumRequeues )
2327
+ }
2328
+
2329
+ if tc .wantRetryDelay > 0 {
2330
+ items := queue .getItems ()
2331
+ if len (items ) != 1 {
2332
+ t .Fatalf ("got %d item(s), want 1" , len (items ))
2333
+ }
2334
+ if gotDelay := items [0 ].Delay ; gotDelay != tc .wantRetryDelay {
2335
+ t .Fatalf ("got delay %s, want %s" , gotDelay , tc .wantRetryDelay )
2336
+ }
2337
+ }
2338
+ })
2339
+ }
2340
+ }
2341
+
2256
2342
type fakeNodeLister struct {
2257
2343
cache []* v1.Node
2258
2344
err error
@@ -2281,3 +2367,33 @@ func (l *fakeNodeLister) Get(name string) (*v1.Node, error) {
2281
2367
}
2282
2368
return nil , nil
2283
2369
}
2370
+
2371
+ // spyWorkQueue implements a work queue and adds the ability to inspect processed
2372
+ // items for testing purposes.
2373
+ type spyWorkQueue struct {
2374
+ workqueue.RateLimitingInterface
2375
+ items []spyQueueItem
2376
+ }
2377
+
2378
+ // spyQueueItem represents an item that was being processed.
2379
+ type spyQueueItem struct {
2380
+ Key interface {}
2381
+ // Delay represents the delayed duration if and only if AddAfter was invoked.
2382
+ Delay time.Duration
2383
+ }
2384
+
2385
+ // AddAfter is like workqueue.RateLimitingInterface.AddAfter but records the
2386
+ // added key and delay internally.
2387
+ func (f * spyWorkQueue ) AddAfter (key interface {}, delay time.Duration ) {
2388
+ f .items = append (f .items , spyQueueItem {
2389
+ Key : key ,
2390
+ Delay : delay ,
2391
+ })
2392
+
2393
+ f .RateLimitingInterface .AddAfter (key , delay )
2394
+ }
2395
+
2396
+ // getItems returns all items that were recorded.
2397
+ func (f * spyWorkQueue ) getItems () []spyQueueItem {
2398
+ return f .items
2399
+ }
0 commit comments