Skip to content

Commit d9018cf

Browse files
authored
fix(storage): validate Bidi option for MRD (#12033)
MultiRangeDownloader should require the experimental option WithGRPCBidiReads to work. Also makes integration tests for MultiRangeDownloader only run when this option is set.
1 parent f7ccc5d commit d9018cf

File tree

5 files changed

+31
-13
lines changed

5 files changed

+31
-13
lines changed

storage/client_test.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -756,12 +756,7 @@ func TestObjectACLCRUDEmulated(t *testing.T) {
756756

757757
func TestOpenReaderEmulated(t *testing.T) {
758758
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
759-
if c, ok := client.(*grpcStorageClient); ok {
760-
c.config.grpcBidiReads = true
761-
defer func() {
762-
c.config.grpcBidiReads = false
763-
}()
764-
}
759+
setBidiReads(t, client)
765760

766761
// Populate test data.
767762
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{
@@ -1475,6 +1470,7 @@ type multiRangeDownloaderOutput struct {
14751470

14761471
func TestMultiRangeDownloaderEmulated(t *testing.T) {
14771472
transportClientTest(skipHTTP("mrd is implemented for grpc client"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1473+
setBidiReads(t, client)
14781474
content := make([]byte, 5<<20)
14791475
rand.New(rand.NewSource(0)).Read(content)
14801476
_, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{
@@ -1553,6 +1549,7 @@ func TestMultiRangeDownloaderEmulated(t *testing.T) {
15531549

15541550
func TestMRDAddAfterCloseEmulated(t *testing.T) {
15551551
transportClientTest(skipHTTP("mrd is implemented for grpc client"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1552+
setBidiReads(t, client)
15561553
content := make([]byte, 5000)
15571554
rand.New(rand.NewSource(0)).Read(content)
15581555
// Populate test data.
@@ -1601,6 +1598,7 @@ func TestMRDAddAfterCloseEmulated(t *testing.T) {
16011598

16021599
func TestMRDAddSanityCheck(t *testing.T) {
16031600
transportClientTest(skipHTTP("mrd is implemented for grpc client"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1601+
setBidiReads(t, client)
16041602
content := make([]byte, 5000)
16051603
rand.New(rand.NewSource(0)).Read(content)
16061604
// Populate test data.
@@ -1662,6 +1660,7 @@ func TestMRDAddSanityCheck(t *testing.T) {
16621660

16631661
func TestMultiRangeDownloaderSpecifyGenerationEmulated(t *testing.T) {
16641662
transportClientTest(skipHTTP("mrd is implemented for grpc client"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1663+
setBidiReads(t, client)
16651664
content := make([]byte, 5000)
16661665
rand.New(rand.NewSource(0)).Read(content)
16671666
// Populate test data.
@@ -2702,3 +2701,14 @@ func checkEmulatorEnvironment(t *testing.T) {
27022701
func isEmulatorEnvironmentSet() bool {
27032702
return os.Getenv("STORAGE_EMULATOR_HOST_GRPC") != "" && os.Getenv("STORAGE_EMULATOR_HOST") != ""
27042703
}
2704+
2705+
// setBidiReads sets the WithGRPCBidiReads option on the client for the duration
2706+
// of the test if applicable.
2707+
func setBidiReads(t *testing.T, client storageClient) {
2708+
if c, ok := client.(*grpcStorageClient); ok {
2709+
c.config.grpcBidiReads = true
2710+
t.Cleanup(func() {
2711+
c.config.grpcBidiReads = false
2712+
})
2713+
}
2714+
}

storage/grpc_client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,10 @@ func contextMetadataFromBidiReadObject(req *storagepb.BidiReadObjectRequest) []s
10551055
}
10561056

10571057
func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params *newMultiRangeDownloaderParams, opts ...storageOption) (mr *MultiRangeDownloader, err error) {
1058+
if !c.config.grpcBidiReads {
1059+
return nil, errors.New("storage: MultiRangeDownloader requires the experimental.WithGRPCBidiReads option")
1060+
}
1061+
10581062
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewMultiRangeDownloader")
10591063
defer func() { trace.EndSpan(ctx, err) }()
10601064
s := callSettings(c.settings, opts...)

storage/integration_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ var readCases = []readCase{
339339
}
340340

341341
func TestIntegration_MultiRangeDownloader(t *testing.T) {
342-
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
342+
multiTransportTest(skipAllButBidi(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
343343
content := make([]byte, 5<<20)
344344
rand.New(rand.NewSource(0)).Read(content)
345345
objName := "MultiRangeDownloader"
@@ -406,7 +406,7 @@ func TestIntegration_MultiRangeDownloader(t *testing.T) {
406406
// TestIntegration_MRDCallbackReturnsDataLength tests if the callback returns the correct data
407407
// read length or not.
408408
func TestIntegration_MRDCallbackReturnsDataLength(t *testing.T) {
409-
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
409+
multiTransportTest(skipAllButBidi(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
410410
content := make([]byte, 1000)
411411
rand.New(rand.NewSource(0)).Read(content)
412412
objName := "MRDCallback"
@@ -455,7 +455,7 @@ func TestIntegration_MRDCallbackReturnsDataLength(t *testing.T) {
455455
// TestIntegration_ReadSameFileConcurrentlyUsingMultiRangeDownloader tests for potential deadlocks
456456
// or race conditions when multiple goroutines call Add() concurrently on the same MRD multiple times.
457457
func TestIntegration_ReadSameFileConcurrentlyUsingMultiRangeDownloader(t *testing.T) {
458-
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
458+
multiTransportTest(skipAllButBidi(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
459459
content := make([]byte, 5<<20)
460460
rand.New(rand.NewSource(0)).Read(content)
461461
objName := "MultiRangeDownloader"
@@ -539,7 +539,7 @@ func TestIntegration_ReadSameFileConcurrentlyUsingMultiRangeDownloader(t *testin
539539
}
540540

541541
func TestIntegration_MRDWithNonRetriableError(t *testing.T) {
542-
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
542+
multiTransportTest(skipAllButBidi(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
543543
content := make([]byte, 5<<20)
544544
rand.New(rand.NewSource(0)).Read(content)
545545
objName := "mrdnonretry"

storage/reader.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
161161
// Must be called on a gRPC client created using [NewGRPCClient].
162162
//
163163
// This uses the gRPC-specific bi-directional read API, which is in private
164-
// preview; please contact your account manager if interested.
164+
// preview; please contact your account manager if interested. The option
165+
// [experimental.WithGRPCBidiReads] must be selected in order to use this API.
165166
func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiRangeDownloader, err error) {
166167
// This span covers the life of the reader. It is closed via the context
167168
// in Reader.Close.

storage/retry_conformance_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"time"
3131

3232
"cloud.google.com/go/internal/uid"
33+
"cloud.google.com/go/storage/experimental"
3334
storage_v1_tests "cloud.google.com/go/storage/internal/test/conformance"
3435
"github.com/google/go-cmp/cmp"
3536
"github.com/googleapis/gax-go/v2"
@@ -653,7 +654,9 @@ var methods = map[string][]retryFunc{
653654
}
654655

655656
// Don't reuse obj, in case preconditions were set on the write request.
656-
r, err := b.Object(obj.ObjectName()).NewReader(ctx)
657+
// TODO: switch to using NewReader instead of NewRangeReader once emulator
658+
// issue with CRC32C for appendable objects is fixed.
659+
r, err := b.Object(obj.ObjectName()).NewRangeReader(ctx, 0, 3*MiB)
657660
defer r.Close()
658661
if err != nil {
659662
return fmt.Errorf("obj.NewReader: %v", err)
@@ -978,7 +981,7 @@ func (et *emulatorTest) create(instructions map[string][]string, transport strin
978981
et.Fatalf("HTTP transportClient: %v", err)
979982
}
980983
if transport == "grpc" {
981-
transportClient, err = NewGRPCClient(ctx)
984+
transportClient, err = NewGRPCClient(ctx, experimental.WithGRPCBidiReads())
982985
if err != nil {
983986
et.Fatalf("GRPC transportClient: %v", err)
984987
}

0 commit comments

Comments
 (0)