-
Notifications
You must be signed in to change notification settings - Fork 40.9k
fix: data race for patchResource func #132049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: googs1025 The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
wasCreated := false | ||
// wasCreated indicates if the object was created during the update. | ||
// Uses int32 for atomic access (1 = true, 0 = false). | ||
var wasCreated int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is appropriate, for this single variable to be used in two goroutine, should we use variables from the atomic package(like atomic.StoreInt32
) or use read-write locks. I understand that this way of using locks will be heavier (i.e.: more burdensome?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure whether protecting concurrent access (no matter how it is done) fixes the underlying problem. The return below seems to assume that the goroutine which writes wasCreated
has terminated. Is that a valid assumption?
If yes, then there has to be some kind of synchronization. The race detector should recognize that synchronization and not report the race. If there is synchronization and the race detector does not detect it, then we have a false positive - that would be very unusual, it heavily errs towards "no false negatives".
If no, then there is a logic error in the flow, which needs to be fixed. Otherwise the return statement could be reached before the goroutine has written the variable, which then passed on the wrong value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, looking at the race detector logs, It looks like we had a write after read, so to me that means the synchronization [to wait for the writer to be complete] is broken?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to fix this issue after KubeCon Hong Kong (6/10-6/11) 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at the source code, if I understand correctly 🤔 :
The main goroutine waits for the background goroutine to finish via a buffered channel (resultCh) before proceeding.
This ensures that any writes to wasCreated (done inside the background goroutine) are completed before the main goroutine reads it using atomic.LoadInt32.
Therefore, the value of wasCreated is always correct when read.
kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go
Lines 81 to 149 in d2c12af
// FinishRequest makes a given ResultFunc asynchronous and handles errors returned by the response. | |
func FinishRequest(ctx context.Context, fn ResultFunc) (runtime.Object, error) { | |
return finishRequest(ctx, fn, postTimeoutLoggerWait, logPostTimeoutResult) | |
} | |
func finishRequest(ctx context.Context, fn ResultFunc, postTimeoutWait time.Duration, postTimeoutLogger PostTimeoutLoggerFunc) (runtime.Object, error) { | |
// the channel needs to be buffered since the post-timeout receiver goroutine | |
// waits up to 5 minutes for the child goroutine to return. | |
resultCh := make(chan *result, 1) | |
go func() { | |
result := &result{} | |
// panics don't cross goroutine boundaries, so we have to handle ourselves | |
defer func() { | |
reason := recover() | |
if reason != nil { | |
// do not wrap the sentinel ErrAbortHandler panic value | |
if reason != http.ErrAbortHandler { | |
// Same as stdlib http server code. Manually allocate stack | |
// trace buffer size to prevent excessively large logs | |
const size = 64 << 10 | |
buf := make([]byte, size) | |
buf = buf[:goruntime.Stack(buf, false)] | |
reason = fmt.Sprintf("%v\n%s", reason, buf) | |
} | |
// store the panic reason into the result. | |
result.reason = reason | |
} | |
// Propagate the result to the parent goroutine | |
resultCh <- result | |
}() | |
if object, err := fn(); err != nil { | |
result.err = err | |
} else { | |
result.object = object | |
} | |
}() | |
select { | |
case result := <-resultCh: | |
return result.Return() | |
case <-ctx.Done(): | |
// we are going to send a timeout response to the caller, but the asynchronous goroutine | |
// (sender) is still executing the ResultFunc function. | |
// kick off a goroutine (receiver) here to wait for the sender (goroutine executing ResultFunc) | |
// to send the result and then log details of the result. | |
defer func() { | |
go func() { | |
timedOutAt := time.Now() | |
var result *result | |
select { | |
case result = <-resultCh: | |
case <-time.After(postTimeoutWait): | |
// we will not wait forever, if we are here then we know that some sender | |
// goroutines are taking longer than postTimeoutWait. | |
} | |
postTimeoutLogger(timedOutAt, result) | |
}() | |
}() | |
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout - %s", ctx.Err()), 0) | |
} | |
} | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one condition where the goroutine writing the value hasn't completed yet: in case of a canceled context.
kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go
Lines 127 to 146 in d2c12af
case <-ctx.Done(): | |
// we are going to send a timeout response to the caller, but the asynchronous goroutine | |
// (sender) is still executing the ResultFunc function. | |
// kick off a goroutine (receiver) here to wait for the sender (goroutine executing ResultFunc) | |
// to send the result and then log details of the result. | |
defer func() { | |
go func() { | |
timedOutAt := time.Now() | |
var result *result | |
select { | |
case result = <-resultCh: | |
case <-time.After(postTimeoutWait): | |
// we will not wait forever, if we are here then we know that some sender | |
// goroutines are taking longer than postTimeoutWait. | |
} | |
postTimeoutLogger(timedOutAt, result) | |
}() | |
}() | |
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout - %s", ctx.Err()), 0) |
In that case, this read here races against the still-running goroutine:
return result, wasCreated, err |
The fix would be to skip that read in case of an error:
// In case of a timeout error, the goroutine handling the request is still running.
// https://github.com/kubernetes/kubernetes/blob/d2c12afa4593e50a187075157d38748292b02733/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go#L127-L146
//
// We cannot reliably read the variable (data race!) and have to assume that
// the object was not created.
if errors.IsTimeoutError(err) {
return result, false, err
}
return result, wasCreated, err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I added the check for errors.IsTimeout(err)
. 🤔 In addition, I understand that we should also use atomic.LoadInt32
? Otherwise, we will still have data race error detection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need the atomic
. Either the goroutine has completed for sure or we don't read the value. In both cases there's no data race.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need the atomic. Either the goroutine has completed for sure or we don't read the value. In both cases there's no data race.
ok, I removed.
cc @pohly |
/cc @jpbetz |
/triage accepted |
4279bb0
to
07d3163
Compare
/retest |
/test pull-kubernetes-e2e-kind |
07d3163
to
7a32c58
Compare
7a32c58
to
51fe83b
Compare
@@ -669,6 +701,20 @@ func TestPatchResourceNumberConversion(t *testing.T) { | |||
tc.Run(t) | |||
} | |||
|
|||
func TestPatchResourceTimeout(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test cases for this part of the changes. 🤔
Signed-off-by: googs1025 <[email protected]>
51fe83b
to
2fd93c0
Compare
@jpbetz can you help this? 😄 |
/assign @jpbetz |
What type of PR is this?
/kind bug
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes #132026
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: