Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(storage): remove protobuf's copy of data on unmarshalling #9526

Merged
merged 13 commits into from
Mar 19, 2024
Next Next commit
draft(storage): read optimization proof of concept
removes unmarshalling copy of all except first chunk off wire
  • Loading branch information
BrennaEpp committed Feb 21, 2024
commit 58f5314530f7ce25dcba8a36f5c28e6af7e748c2
115 changes: 108 additions & 7 deletions storage/grpc_client.go
Expand Up @@ -34,9 +34,13 @@ import (
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"

"github.com/golang/protobuf/proto"
)

const (
Expand Down Expand Up @@ -902,12 +906,53 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec
return r, nil
}

// bytesCodec is a grpc codec which permits sending/receiving messages as either
// protobuf messages, or as raw []bytes.
type bytesCodec struct {
encoding.Codec
}

func (bytesCodec) Marshal(v any) ([]byte, error) {
switch v := v.(type) {
case []byte:
tritone marked this conversation as resolved.
Show resolved Hide resolved
return v, nil
case proto.Message:
return proto.Marshal(v)
default:
return nil, fmt.Errorf("can not marshal type %T", v)
}
}

func (bytesCodec) Unmarshal(data []byte, v any) error {
switch v := v.(type) {
case *[]byte:
// gRPC can recycle the data []byte after unmarshaling,
// so we need to make a copy here.
*v = data
return nil
case proto.Message:
return proto.Unmarshal(data, v)
default:
return fmt.Errorf("can not unmarshal type %T", v)
}
}

func (bytesCodec) Name() string {
// If this isn't "", then gRPC sets the content-subtype of the call to this
// value and we get errors.
return ""
}

func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)

s.gax = append(s.gax[:len(s.gax)], gax.WithGRPCOptions(
grpc.ForceCodec(bytesCodec{}),
))

if s.userProject != "" {
ctx = setUserProjectMetadata(ctx, s.userProject)
}
Expand Down Expand Up @@ -957,6 +1002,11 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
return err
}

// This receive still goes through protobuf unmarshaling.
// Subsequent receives in Read calls will skip protobuf unmarshaling
// and directly read the content from the gRPC []byte response.
//
// We could also use a custom decoder here.
msg, err = stream.Recv()
// These types of errors show up on the Recv call, rather than the
// initialization of the stream via ReadObject above.
Expand Down Expand Up @@ -1406,6 +1456,7 @@ type gRPCReader struct {
stream storagepb.Storage_ReadObjectClient
reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
leftovers []byte
databuf []byte
cancel context.CancelFunc
settings *settings
}
Expand Down Expand Up @@ -1436,7 +1487,7 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
}

// Attempt to Recv the next message on the stream.
msg, err := r.recv()
content, err := r.recv()
if err != nil {
return 0, err
}
Expand All @@ -1448,7 +1499,6 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
// present in the response here.
// TODO: Figure out if we need to support decompressive transcoding
// https://cloud.google.com/storage/docs/transcoding.
content := msg.GetChecksummedData().GetContent()
n = copy(p[n:], content)
leftover := len(content) - n
if leftover > 0 {
Expand All @@ -1471,7 +1521,7 @@ func (r *gRPCReader) Close() error {
return nil
}

// recv attempts to Recv the next message on the stream. In the event
// recv attempts to Recv the next chunk of content on the stream. In the event
tritone marked this conversation as resolved.
Show resolved Hide resolved
// that a retryable error is encountered, the stream will be closed, reopened,
// and Recv again. This will attempt to Recv until one of the following is true:
//
Expand All @@ -1481,8 +1531,9 @@ func (r *gRPCReader) Close() error {
//
// The last error received is the one that is returned, which could be from
// an attempt to reopen the stream.
func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
msg, err := r.stream.Recv()
func (r *gRPCReader) recv() ([]byte, error) {
err := r.stream.RecvMsg(&r.databuf)

var shouldRetry = ShouldRetry
if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
shouldRetry = r.settings.retry.shouldRetry
Expand All @@ -1492,10 +1543,60 @@ func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
// reopen the stream, but will backoff if further attempts are necessary.
// Reopening the stream Recvs the first message, so if retrying is
// successful, the next logical chunk will be returned.
msg, err = r.reopenStream()
msg, err := r.reopenStream()
tritone marked this conversation as resolved.
Show resolved Hide resolved
return msg.GetChecksummedData().GetContent(), err
}

return msg, err
if err != nil {
return nil, err
}

return readObjectResponseContent(r.databuf)
}

// readObjectResponseContent returns the checksummed_data.content field of a
// ReadObjectResponse message.
func readObjectResponseContent(b []byte) ([]byte, error) {
const (
readObjectResponse_checksummedData = protowire.Number(1)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
checksummedData_content = protowire.Number(1)
)
checksummedData := readProtoBytes(b, readObjectResponse_checksummedData)
content := readProtoBytes(checksummedData, checksummedData_content)
if content == nil {
return nil, errors.New("invalid ReadObjectResponse")
}
return content, nil
}

// readProtoBytes returns the contents of the protobuf field with number num
// and type bytes from a wire-encoded message, or nil if the field can not be found.
//
// It does not handle field concatenation, in which the contents of a single field
// are split across multiple protobuf tags. Encoded data containing split fields
// of this form is technically permissable, but uncommon.
func readProtoBytes(b []byte, num protowire.Number) []byte {
off := 0
for off < len(b) {
gotNum, gotTyp, n := protowire.ConsumeTag(b[off:])
if n < 0 {
return nil
}
off += n
if gotNum == num && gotTyp == protowire.BytesType {
b, n := protowire.ConsumeBytes(b[off:])
if n < 0 {
return nil
}
return b
}
n = protowire.ConsumeFieldValue(gotNum, gotTyp, b[off:])
if n < 0 {
return nil
}
off += n
}
return nil
}

// reopenStream "closes" the existing stream and attempts to reopen a stream and
Expand Down
6 changes: 5 additions & 1 deletion storage/integration_test.go
Expand Up @@ -1023,7 +1023,8 @@ func TestIntegration_ObjectReadChunksGRPC(t *testing.T) {
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
h := testHelper{t}
// Use a larger blob to test chunking logic. This is a little over 5MB.
content := bytes.Repeat([]byte("a"), 5<<20)
content := make([]byte, 5<<20)
rand.New(rand.NewSource(0)).Read(content)
tritone marked this conversation as resolved.
Show resolved Hide resolved

// Upload test data.
obj := client.Bucket(bucket).Object(uidSpaceObjects.New())
Expand Down Expand Up @@ -1066,6 +1067,9 @@ func TestIntegration_ObjectReadChunksGRPC(t *testing.T) {
if rem := r.Remain(); rem != 0 {
t.Errorf("got %v bytes remaining, want 0", rem)
}
if !bytes.Equal(buf, content) {
t.Errorf("content mismatch")
}
})
}

Expand Down