diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index 0d6f98b6f61..ceb1201e408 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -419,6 +419,8 @@ func (co *connection) lockingAppend(pw *pendingWrite) error { err = (*arc).Send(pw.constructFullRequest(true)) } if err != nil { + // Refund the flow controller immediately, as there's nothing to refund on the receiver. + co.fc.release(pw.reqSize) if shouldReconnect(err) { metricCtx := co.ctx // start with the ctx that must be present if pw.writer != nil { diff --git a/bigquery/storage/managedwriter/connection_test.go b/bigquery/storage/managedwriter/connection_test.go index 8010afc70b3..08346a66d4f 100644 --- a/bigquery/storage/managedwriter/connection_test.go +++ b/bigquery/storage/managedwriter/connection_test.go @@ -103,6 +103,58 @@ func TestConnection_OpenWithRetry(t *testing.T) { } } +// Ensure we properly refund the flow control during send failures. +// https://github.com/googleapis/google-cloud-go/issues/9540 +func TestConnection_LockingAppendFlowRelease(t *testing.T) { + ctx := context.Background() + + pool := &connectionPool{ + ctx: ctx, + baseFlowController: newFlowController(10, 0), + open: openTestArc(&testAppendRowsClient{}, + func(req *storagepb.AppendRowsRequest) error { + // Append always reports EOF on send. + return io.EOF + }, nil), + } + router := newSimpleRouter("") + if err := pool.activateRouter(router); err != nil { + t.Errorf("activateRouter: %v", err) + } + + writer := &ManagedStream{id: "foo", ctx: ctx} + if err := pool.addWriter(writer); err != nil { + t.Errorf("addWriter: %v", err) + } + + pw := newPendingWrite(ctx, writer, &storagepb.AppendRowsRequest{WriteStream: "somestream"}, newVersionedTemplate(), "", "") + for i := 0; i < 5; i++ { + conn, err := router.pool.selectConn(pw) + if err != nil { + t.Errorf("selectConn: %v", err) + } + + // Ensure FC is empty before lockingAppend + if got := conn.fc.count(); got != 0 { + t.Errorf("attempt %d expected empty flow count, got %d", i, got) + } + if got := conn.fc.bytes(); got != 0 { + t.Errorf("attempt %d expected empty flow bytes, got %d", i, got) + } + // invoke lockingAppend, which fails + if err := conn.lockingAppend(pw); err != io.EOF { + t.Errorf("lockingAppend attempt %d: expected io.EOF, got %v", i, err) + } + // Ensure we're refunded due to failure + if got := conn.fc.count(); got != 0 { + t.Errorf("attempt %d expected empty flow count, got %d", i, got) + } + if got := conn.fc.bytes(); got != 0 { + t.Errorf("attempt %d expected empty flow bytes, got %d", i, got) + } + } +} + // Ensures we don't lose track of channels/connections during reconnects. // https://github.com/googleapis/google-cloud-go/issues/6766 func TestConnection_LeakingReconnect(t *testing.T) {