Skip to content

Commit 3e177e7

Browse files
cjc25tritone
andauthored
fix(storage): Fix retries for redirection errors. (#12093)
* fix(storage): Fix retries for redirection errors. Retrying inline might resending buffers where aren't stable yet. Surface redirects to the stream-level retry machinery to catch the resend logic. * fix check against idempotency to not lose the shouldRetry function if !s.idempotent but policy == RetryAlways --------- Co-authored-by: Chris Cotter <[email protected]>
1 parent bd91c05 commit 3e177e7

File tree

4 files changed

+56
-30
lines changed

4 files changed

+56
-30
lines changed

storage/grpc_client.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,11 +1419,7 @@ func (mrd *gRPCBidiReader) activeRange() []mrdRange {
14191419

14201420
// retryStream cancel's stream and reopen the stream again.
14211421
func (mrd *gRPCBidiReader) retryStream(err error) error {
1422-
var shouldRetry = ShouldRetry
1423-
if mrd.settings.retry != nil && mrd.settings.retry.shouldRetry != nil {
1424-
shouldRetry = mrd.settings.retry.shouldRetry
1425-
}
1426-
if shouldRetry(err) {
1422+
if mrd.settings.retry.runShouldRetry(err) {
14271423
// This will "close" the existing stream and immediately attempt to
14281424
// reopen the stream, but will backoff if further attempts are necessary.
14291425
// When Reopening the stream only failed readID will be added to stream.
@@ -2032,11 +2028,7 @@ func (r *gRPCReader) Close() error {
20322028
func (r *gRPCReader) recv() error {
20332029
databufs := mem.BufferSlice{}
20342030
err := r.stream.RecvMsg(&databufs)
2035-
var shouldRetry = ShouldRetry
2036-
if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
2037-
shouldRetry = r.settings.retry.shouldRetry
2038-
}
2039-
if err != nil && shouldRetry(err) {
2031+
if err != nil && r.settings.retry.runShouldRetry(err) {
20402032
// This will "close" the existing stream and immediately attempt to
20412033
// reopen the stream, but will backoff if further attempts are necessary.
20422034
// Reopening the stream Recvs the first message, so if retrying is

storage/grpc_reader.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,11 +424,7 @@ func (r *gRPCReadObjectReader) recv() error {
424424
databufs := mem.BufferSlice{}
425425
err := r.stream.RecvMsg(&databufs)
426426

427-
var shouldRetry = ShouldRetry
428-
if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
429-
shouldRetry = r.settings.retry.shouldRetry
430-
}
431-
if err != nil && shouldRetry(err) {
427+
if err != nil && r.settings.retry.runShouldRetry(err) {
432428
// This will "close" the existing stream and immediately attempt to
433429
// reopen the stream, but will backoff if further attempts are necessary.
434430
// Reopening the stream Recvs the first message, so if retrying is

storage/grpc_writer.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,40 @@ const (
4242
maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
4343
)
4444

45+
func withBidiWriteObjectRedirectionErrorRetries(s *settings) (newr *retryConfig) {
46+
oldr := s.retry
47+
newr = oldr.clone()
48+
if newr == nil {
49+
newr = &retryConfig{}
50+
}
51+
if (oldr.policy == RetryIdempotent && !s.idempotent) || oldr.policy == RetryNever {
52+
// We still retry redirection errors even when settings indicate not to
53+
// retry.
54+
//
55+
// The protocol requires us to respect redirection errors, so RetryNever has
56+
// to ignore them.
57+
//
58+
// Idempotency is always protected by redirection errors: they either
59+
// contain a handle which can be used as idempotency information, or they do
60+
// not contain a handle and are "affirmative failures" which indicate that
61+
// no server-side action occurred.
62+
newr.policy = RetryAlways
63+
newr.shouldRetry = func(err error) bool {
64+
return errors.Is(err, bidiWriteObjectRedirectionError{})
65+
}
66+
return newr
67+
}
68+
// If retry settings allow retries normally, fall back to that behavior.
69+
newr.shouldRetry = func(err error) bool {
70+
if errors.Is(err, bidiWriteObjectRedirectionError{}) {
71+
return true
72+
}
73+
v := oldr.runShouldRetry(err)
74+
return v
75+
}
76+
return newr
77+
}
78+
4579
func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
4680
var offset int64
4781
errorf := params.setError
@@ -58,6 +92,9 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
5892
if s.retry == nil {
5993
s.retry = defaultRetry.clone()
6094
}
95+
if params.append {
96+
s.retry = withBidiWriteObjectRedirectionErrorRetries(s)
97+
}
6198
s.retry.maxRetryDuration = retryDeadline
6299

63100
// Set Flush func for use by exported Writer.Flush.
@@ -723,6 +760,10 @@ func (s *gRPCAppendBidiWriteBufferSender) connect(ctx context.Context) (err erro
723760
if s.firstMessage.GetAppendObjectSpec().GetGeneration() != 0 {
724761
return nil
725762
}
763+
// Also always ok to reconnect if we've seen a redirect token
764+
if s.routingToken != nil {
765+
return nil
766+
}
726767

727768
// We can also reconnect if the first message has an if_generation_match or
728769
// if_metageneration_match condition. Note that negative conditions like
@@ -805,7 +846,7 @@ func (s *gRPCAppendBidiWriteBufferSender) maybeUpdateFirstMessage(resp *storagep
805846
type bidiWriteObjectRedirectionError struct{}
806847

807848
func (e bidiWriteObjectRedirectionError) Error() string {
808-
return "BidiWriteObjectRedirectedError"
849+
return ""
809850
}
810851

811852
func (s *gRPCAppendBidiWriteBufferSender) handleRedirectionError(e *storagepb.BidiWriteObjectRedirectedError) bool {
@@ -850,10 +891,10 @@ func (s *gRPCAppendBidiWriteBufferSender) receiveMessages(resps chan<- *storagep
850891
if st, ok := status.FromError(err); ok && st.Code() == codes.Aborted {
851892
for _, d := range st.Details() {
852893
if e, ok := d.(*storagepb.BidiWriteObjectRedirectedError); ok {
853-
// If we can handle this error, replace it with the sentinel. Otherwise,
854-
// report it to the user.
894+
// If we can handle this error, wrap it with the sentinel so it gets
895+
// retried.
855896
if ok := s.handleRedirectionError(e); ok {
856-
err = bidiWriteObjectRedirectionError{}
897+
err = fmt.Errorf("%w%w", bidiWriteObjectRedirectionError{}, err)
857898
}
858899
}
859900
}
@@ -971,12 +1012,6 @@ func (s *gRPCAppendBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []
9711012
err = s.recvErr
9721013
}
9731014
s.stream = nil
974-
975-
// Retry transparently on a redirection error
976-
if _, ok := err.(bidiWriteObjectRedirectionError); ok {
977-
s.forceFirstMessage = true
978-
continue
979-
}
9801015
return
9811016
}
9821017
}

storage/invoke.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ var (
5353
})
5454
)
5555

56+
func (r *retryConfig) runShouldRetry(err error) bool {
57+
if r == nil || r.shouldRetry == nil {
58+
return ShouldRetry(err)
59+
}
60+
return r.shouldRetry(err)
61+
}
62+
5663
// run determines whether a retry is necessary based on the config and
5764
// idempotency information. It then calls the function with or without retries
5865
// as appropriate, using the configured settings.
@@ -73,10 +80,6 @@ func run(ctx context.Context, call func(ctx context.Context) error, retry *retry
7380
bo.Initial = retry.backoff.Initial
7481
bo.Max = retry.backoff.Max
7582
}
76-
var errorFunc func(err error) bool = ShouldRetry
77-
if retry.shouldRetry != nil {
78-
errorFunc = retry.shouldRetry
79-
}
8083

8184
var quitAfterTimer *time.Timer
8285
if retry.maxRetryDuration != 0 {
@@ -103,7 +106,7 @@ func run(ctx context.Context, call func(ctx context.Context) error, retry *retry
103106
return true, fmt.Errorf("storage: retry failed after %v attempts; last error: %w", *retry.maxAttempts, lastErr)
104107
}
105108
attempts++
106-
retryable := errorFunc(lastErr)
109+
retryable := retry.runShouldRetry(lastErr)
107110
// Explicitly check context cancellation so that we can distinguish between a
108111
// DEADLINE_EXCEEDED error from the server and a user-set context deadline.
109112
// Unfortunately gRPC will codes.DeadlineExceeded (which may be retryable if it's

0 commit comments

Comments
 (0)