diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index 7e14fff2791..0d6f98b6f61 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -498,24 +498,39 @@ func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, f // enables testing type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) +var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled") + // connRecvProcessor is used to propagate append responses back up with the originating write requests. It // It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new -// processing gorouting and backing channel. +// context, processing goroutine and backing channel. func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { for { select { case <-ctx.Done(): - // Context is done, so we're not going to get further updates. Mark all work left in the channel - // with the context error. We don't attempt to re-enqueue in this case. + // Channel context is done, which means we're not getting further updates on in flight appends and should + // process everything left in the existing channel/connection. + doneErr := ctx.Err() + if doneErr == context.Canceled { + // This is a special case. Connection recovery ends up cancelling a context as part of a reconnection, and with + // request retrying enabled we can possibly re-enqueue writes. To allow graceful retry for this behavior, we + // we translate this to an rpc status error to avoid doing things like introducing context errors as part of the retry predicate. + // + // The tradeoff here is that write retries may roundtrip multiple times for something like a pool shutdown, even though the final + // outcome would result in an error. + doneErr = errConnectionCanceled + } for { pw, ok := <-ch if !ok { return } - // It's unlikely this connection will recover here, but for correctness keep the flow controller - // state correct by releasing. + // This connection will not recover, but still attempt to keep flow controller state consistent. co.release(pw) - pw.markDone(nil, ctx.Err()) + + // TODO: Determine if/how we should report this case, as we have no viable context for propagating. + + // Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue. + pw.writer.processRetry(pw, co, nil, doneErr) } case nextWrite, ok := <-ch: if !ok { diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index c2983e84a79..60c5c347d1e 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -130,13 +130,23 @@ func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, b // our bidi stream to close/reopen based on the responses error. Errors here signal that no // further appends will succeed. func shouldReconnect(err error) bool { - var knownErrors = []error{ - io.EOF, - status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport + + // io.EOF is the typical not connected signal. + if errors.Is(err, io.EOF) { + return true + } + // Backend responses that trigger reconnection on send. + reconnectCodes := []codes.Code{ + codes.Aborted, + codes.Canceled, + codes.Unavailable, + codes.DeadlineExceeded, } - for _, ke := range knownErrors { - if errors.Is(err, ke) { - return true + if s, ok := status.FromError(err); ok { + for _, c := range reconnectCodes { + if s.Code() == c { + return true + } } } return false diff --git a/bigquery/storage/managedwriter/retry_test.go b/bigquery/storage/managedwriter/retry_test.go index eb98381bc0a..17066c6ff52 100644 --- a/bigquery/storage/managedwriter/retry_test.go +++ b/bigquery/storage/managedwriter/retry_test.go @@ -15,6 +15,7 @@ package managedwriter import ( + "context" "fmt" "io" "testing" @@ -60,6 +61,10 @@ func TestManagedStream_AppendErrorRetries(t *testing.T) { err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"), want: true, }, + { + err: context.Canceled, + want: false, + }, } retry := newStatelessRetryer() @@ -86,11 +91,23 @@ func TestManagedStream_ShouldReconnect(t *testing.T) { want: true, }, { - err: status.Error(codes.Unavailable, "nope"), + err: status.Error(codes.Unavailable, "the connection is draining"), + want: true, + }, + { + err: status.Error(codes.ResourceExhausted, "oof"), // may just be pushback want: false, }, { - err: status.Error(codes.Unavailable, "the connection is draining"), + err: status.Error(codes.Canceled, "blah"), + want: true, + }, + { + err: status.Error(codes.Aborted, "connection has been idle too long"), + want: true, + }, + { + err: status.Error(codes.DeadlineExceeded, "blah"), // possibly bad backend, reconnect to speed recovery. want: true, }, {