@@ -42,6 +42,40 @@ const (
42
42
maxPerMessageWriteSize int = int (storagepb .ServiceConstants_MAX_WRITE_CHUNK_BYTES )
43
43
)
44
44
45
+ func withBidiWriteObjectRedirectionErrorRetries (s * settings ) (newr * retryConfig ) {
46
+ oldr := s .retry
47
+ newr = oldr .clone ()
48
+ if newr == nil {
49
+ newr = & retryConfig {}
50
+ }
51
+ if (oldr .policy == RetryIdempotent && ! s .idempotent ) || oldr .policy == RetryNever {
52
+ // We still retry redirection errors even when settings indicate not to
53
+ // retry.
54
+ //
55
+ // The protocol requires us to respect redirection errors, so RetryNever has
56
+ // to ignore them.
57
+ //
58
+ // Idempotency is always protected by redirection errors: they either
59
+ // contain a handle which can be used as idempotency information, or they do
60
+ // not contain a handle and are "affirmative failures" which indicate that
61
+ // no server-side action occurred.
62
+ newr .policy = RetryAlways
63
+ newr .shouldRetry = func (err error ) bool {
64
+ return errors .Is (err , bidiWriteObjectRedirectionError {})
65
+ }
66
+ return newr
67
+ }
68
+ // If retry settings allow retries normally, fall back to that behavior.
69
+ newr .shouldRetry = func (err error ) bool {
70
+ if errors .Is (err , bidiWriteObjectRedirectionError {}) {
71
+ return true
72
+ }
73
+ v := oldr .runShouldRetry (err )
74
+ return v
75
+ }
76
+ return newr
77
+ }
78
+
45
79
func (c * grpcStorageClient ) OpenWriter (params * openWriterParams , opts ... storageOption ) (* io.PipeWriter , error ) {
46
80
var offset int64
47
81
errorf := params .setError
@@ -58,6 +92,9 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
58
92
if s .retry == nil {
59
93
s .retry = defaultRetry .clone ()
60
94
}
95
+ if params .append {
96
+ s .retry = withBidiWriteObjectRedirectionErrorRetries (s )
97
+ }
61
98
s .retry .maxRetryDuration = retryDeadline
62
99
63
100
// Set Flush func for use by exported Writer.Flush.
@@ -723,6 +760,10 @@ func (s *gRPCAppendBidiWriteBufferSender) connect(ctx context.Context) (err erro
723
760
if s .firstMessage .GetAppendObjectSpec ().GetGeneration () != 0 {
724
761
return nil
725
762
}
763
+ // Also always ok to reconnect if we've seen a redirect token
764
+ if s .routingToken != nil {
765
+ return nil
766
+ }
726
767
727
768
// We can also reconnect if the first message has an if_generation_match or
728
769
// if_metageneration_match condition. Note that negative conditions like
@@ -805,7 +846,7 @@ func (s *gRPCAppendBidiWriteBufferSender) maybeUpdateFirstMessage(resp *storagep
805
846
type bidiWriteObjectRedirectionError struct {}
806
847
807
848
func (e bidiWriteObjectRedirectionError ) Error () string {
808
- return "BidiWriteObjectRedirectedError "
849
+ return ""
809
850
}
810
851
811
852
func (s * gRPCAppendBidiWriteBufferSender ) handleRedirectionError (e * storagepb.BidiWriteObjectRedirectedError ) bool {
@@ -850,10 +891,10 @@ func (s *gRPCAppendBidiWriteBufferSender) receiveMessages(resps chan<- *storagep
850
891
if st , ok := status .FromError (err ); ok && st .Code () == codes .Aborted {
851
892
for _ , d := range st .Details () {
852
893
if e , ok := d .(* storagepb.BidiWriteObjectRedirectedError ); ok {
853
- // If we can handle this error, replace it with the sentinel. Otherwise,
854
- // report it to the user .
894
+ // If we can handle this error, wrap it with the sentinel so it gets
895
+ // retried .
855
896
if ok := s .handleRedirectionError (e ); ok {
856
- err = bidiWriteObjectRedirectionError {}
897
+ err = fmt . Errorf ( "%w%w" , bidiWriteObjectRedirectionError {}, err )
857
898
}
858
899
}
859
900
}
@@ -971,12 +1012,6 @@ func (s *gRPCAppendBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []
971
1012
err = s .recvErr
972
1013
}
973
1014
s .stream = nil
974
-
975
- // Retry transparently on a redirection error
976
- if _ , ok := err .(bidiWriteObjectRedirectionError ); ok {
977
- s .forceFirstMessage = true
978
- continue
979
- }
980
1015
return
981
1016
}
982
1017
}
0 commit comments