Skip to content

Commit 35d2431

Browse files
committed
informer: fix race against Run and SetTransform/SetWatchErrorHandler
`SetWatchErrorHandler` claims it will fail if Run() has already started. But if they are called concurrently, it will actually trigger a data race. With this PR: ``` 62702 runs so far, 0 failures (100.00% pass rate). 59.152682ms avg, 189.068387ms max, 26.623785ms min ``` Without this PR: ``` 5012 runs so far, 38 failures (99.25% pass rate). 58.675502ms avg, 186.018084ms max, 29.468104ms min ```
1 parent 6442024 commit 35d2431

File tree

2 files changed

+46
-18
lines changed

2 files changed

+46
-18
lines changed

staging/src/k8s.io/client-go/tools/cache/shared_informer.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -459,29 +459,30 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
459459
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
460460
return
461461
}
462-
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
463-
KnownObjects: s.indexer,
464-
EmitDeltaTypeReplaced: true,
465-
Transformer: s.transform,
466-
})
467-
468-
cfg := &Config{
469-
Queue: fifo,
470-
ListerWatcher: s.listerWatcher,
471-
ObjectType: s.objectType,
472-
ObjectDescription: s.objectDescription,
473-
FullResyncPeriod: s.resyncCheckPeriod,
474-
RetryOnError: false,
475-
ShouldResync: s.processor.shouldResync,
476-
477-
Process: s.HandleDeltas,
478-
WatchErrorHandler: s.watchErrorHandler,
479-
}
480462

481463
func() {
482464
s.startedLock.Lock()
483465
defer s.startedLock.Unlock()
484466

467+
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
468+
KnownObjects: s.indexer,
469+
EmitDeltaTypeReplaced: true,
470+
Transformer: s.transform,
471+
})
472+
473+
cfg := &Config{
474+
Queue: fifo,
475+
ListerWatcher: s.listerWatcher,
476+
ObjectType: s.objectType,
477+
ObjectDescription: s.objectDescription,
478+
FullResyncPeriod: s.resyncCheckPeriod,
479+
RetryOnError: false,
480+
ShouldResync: s.processor.shouldResync,
481+
482+
Process: s.HandleDeltas,
483+
WatchErrorHandler: s.watchErrorHandler,
484+
}
485+
485486
s.controller = New(cfg)
486487
s.controller.(*controller).clock = s.clock
487488
s.started = true

staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,33 @@ func TestSharedInformerErrorHandling(t *testing.T) {
393393
close(stop)
394394
}
395395

396+
// TestSharedInformerStartRace is a regression test to ensure there is no race between
397+
// Run and SetWatchErrorHandler, and Run and SetTransform.
398+
func TestSharedInformerStartRace(t *testing.T) {
399+
source := fcache.NewFakeControllerSource()
400+
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
401+
stop := make(chan struct{})
402+
go func() {
403+
for {
404+
select {
405+
case <-stop:
406+
return
407+
default:
408+
}
409+
// Set dummy functions, just to test for race
410+
informer.SetTransform(func(i interface{}) (interface{}, error) {
411+
return i, nil
412+
})
413+
informer.SetWatchErrorHandler(func(r *Reflector, err error) {
414+
})
415+
}
416+
}()
417+
418+
go informer.Run(stop)
419+
420+
close(stop)
421+
}
422+
396423
func TestSharedInformerTransformer(t *testing.T) {
397424
// source simulates an apiserver object endpoint.
398425
source := fcache.NewFakeControllerSource()

0 commit comments

Comments
 (0)