Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery/storage/managedwriter): retry improvements #9642

Merged
merged 2 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Next Next commit
fix(bigquery/storage/managedwriter): retry improvements
This PR makes two changes to retry behaviors in managedwriter.

In the first, this PR expands the set of conditions that trigger
reconnect when sending the initial request to the backend.

In the second, this PR adds some additional handling for context
cancellations when reading responses back from the service.  In
cases like reconnection, we establish a new Connection, each of
which has it's own associated context.  When draining remaining
writes from a connection that's being shut down, we now pass the
write into a retryer with a status-based error rather than raw
context.Canceled, so we can recover more cleanly if the user is
leveraging write retries.

Related internal issue:
b/326242484
  • Loading branch information
shollyman committed Mar 25, 2024
commit ac85bccc066443457ed5669de0dac96ee30599f3
27 changes: 21 additions & 6 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,24 +498,39 @@ func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, f
// enables testing
type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)

var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled")

// connRecvProcessor is used to propagate append responses back up with the originating write requests. It
// It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new
// processing gorouting and backing channel.
// context, processing goroutine and backing channel.
func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
for {
select {
case <-ctx.Done():
// Context is done, so we're not going to get further updates. Mark all work left in the channel
// with the context error. We don't attempt to re-enqueue in this case.
// Channel context is done, which means we're not getting further updates on in flight appends and should
// process everything left in the existing channel/connection.
doneErr := ctx.Err()
if doneErr == context.Canceled {
// This is a special case. Connection recovery ends up cancelling a context as part of a reconnection, and with
// request retrying enabled we can possibly re-enqueue writes. To allow graceful retry for this behavior, we
// we translate this to an rpc status error to avoid doing things like introducing context errors as part of the retry predicate.
//
// The tradeoff here is that write retries may roundtrip multiple times for something like a pool shutdown, even though the final
// outcome would result in an error.
doneErr = errConnectionCanceled
}
for {
pw, ok := <-ch
if !ok {
return
}
// It's unlikely this connection will recover here, but for correctness keep the flow controller
// state correct by releasing.
// This connection will not recover, but still attempt to keep flow controller state consistent.
co.release(pw)
pw.markDone(nil, ctx.Err())

// TODO: Determine if/how we should report this case, as we have no viable context for propagating.

// Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue.
pw.writer.processRetry(pw, co, nil, doneErr)
}
case nextWrite, ok := <-ch:
if !ok {
Expand Down
22 changes: 16 additions & 6 deletions bigquery/storage/managedwriter/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,23 @@ func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, b
// our bidi stream to close/reopen based on the responses error. Errors here signal that no
// further appends will succeed.
func shouldReconnect(err error) bool {
var knownErrors = []error{
io.EOF,
status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport

// io.EOF is the typical not connected signal.
if errors.Is(err, io.EOF) {
return true
}
// Backend responses that trigger reconnection on send.
reconnectCodes := []codes.Code{
codes.Aborted,
codes.Canceled,
codes.Unavailable,
codes.DeadlineExceeded,
}
for _, ke := range knownErrors {
if errors.Is(err, ke) {
return true
if s, ok := status.FromError(err); ok {
for _, c := range reconnectCodes {
if s.Code() == c {
return true
}
}
}
return false
Expand Down
21 changes: 19 additions & 2 deletions bigquery/storage/managedwriter/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package managedwriter

import (
"context"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -60,6 +61,10 @@ func TestManagedStream_AppendErrorRetries(t *testing.T) {
err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"),
want: true,
},
{
err: context.Canceled,
want: false,
},
}

retry := newStatelessRetryer()
Expand All @@ -86,11 +91,23 @@ func TestManagedStream_ShouldReconnect(t *testing.T) {
want: true,
},
{
err: status.Error(codes.Unavailable, "nope"),
err: status.Error(codes.Unavailable, "the connection is draining"),
want: true,
},
{
err: status.Error(codes.ResourceExhausted, "oof"), // may just be pushback
want: false,
},
{
err: status.Error(codes.Unavailable, "the connection is draining"),
err: status.Error(codes.Canceled, "blah"),
want: true,
},
{
err: status.Error(codes.Aborted, "connection has been idle too long"),
want: true,
},
{
err: status.Error(codes.DeadlineExceeded, "blah"), // possibly bad backend, reconnect to speed recovery.
want: true,
},
{
Expand Down