Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(storage): remove protobuf's copy of data on unmarshalling #9526

Merged
merged 13 commits into from
Mar 19, 2024
2 changes: 1 addition & 1 deletion storage/go.mod
Expand Up @@ -8,6 +8,7 @@ require (
cloud.google.com/go v0.112.0
cloud.google.com/go/compute/metadata v0.2.3
cloud.google.com/go/iam v1.1.6
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.0
Expand All @@ -26,7 +27,6 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/martian/v3 v3.3.2 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
Expand Down
209 changes: 201 additions & 8 deletions storage/grpc_client.go
Expand Up @@ -27,15 +27,18 @@
"cloud.google.com/go/internal/trace"
gapic "cloud.google.com/go/storage/internal/apiv2"
"cloud.google.com/go/storage/internal/apiv2/storagepb"
"github.com/golang/protobuf/proto"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
)

Expand Down Expand Up @@ -902,12 +905,53 @@
return r, nil
}

// bytesCodec is a grpc codec which permits sending/receiving messages as either
// protobuf messages, or as raw []bytes.
type bytesCodec struct {
encoding.Codec
}

func (bytesCodec) Marshal(v any) ([]byte, error) {
switch v := v.(type) {
case []byte:
tritone marked this conversation as resolved.
Show resolved Hide resolved
return v, nil
case proto.Message:
return proto.Marshal(v)
default:
return nil, fmt.Errorf("can not marshal type %T", v)
}
}

func (bytesCodec) Unmarshal(data []byte, v any) error {
switch v := v.(type) {
case *[]byte:
// If gRPC could recycle the data []byte after unmarshaling (through
// buffer pools), we would need to make a copy here.
*v = data
return nil
case proto.Message:
return proto.Unmarshal(data, v)
default:
return fmt.Errorf("can not unmarshal type %T", v)
}
}

func (bytesCodec) Name() string {
// If this isn't "", then gRPC sets the content-subtype of the call to this
// value and we get errors.
return ""
}

func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)

s.gax = append(s.gax, gax.WithGRPCOptions(
grpc.ForceCodec(bytesCodec{}),
))

if s.userProject != "" {
ctx = setUserProjectMetadata(ctx, s.userProject)
}
Expand All @@ -923,6 +967,8 @@
req.Generation = params.gen
}

var databuf []byte

// Define a function that initiates a Read with offset and length, assuming
// we have already read seen bytes.
reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
Expand Down Expand Up @@ -957,12 +1003,23 @@
return err
}

msg, err = stream.Recv()
// Receive the message as a wire-encoded message so we can use a
// custom decoder to avoid an extra copy at the protobuf layer.
err := stream.RecvMsg(&databuf)
// These types of errors show up on the Recv call, rather than the
// initialization of the stream via ReadObject above.
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return ErrObjectNotExist
}
if err != nil {
return err
}
// Use a custom decoder that uses protobuf unmarshalling for all
// fields except the checksummed data content.
// Subsequent receives in Read calls will skip all protobuf
// unmarshalling and directly read the content from the gRPC []byte
// response, since only the first call will contain other fields.
msg, err = readFullObjectResponse(databuf)

return err
}, s.retry, s.idempotent)
Expand Down Expand Up @@ -1008,6 +1065,7 @@
leftovers: msg.GetChecksummedData().GetContent(),
settings: s,
zeroRange: params.length == 0,
databuf: databuf,
},
}

Expand Down Expand Up @@ -1406,6 +1464,7 @@
stream storagepb.Storage_ReadObjectClient
reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
leftovers []byte
databuf []byte
cancel context.CancelFunc
settings *settings
}
Expand Down Expand Up @@ -1436,7 +1495,7 @@
}

// Attempt to Recv the next message on the stream.
msg, err := r.recv()
content, err := r.recv()
if err != nil {
return 0, err
}
Expand All @@ -1448,7 +1507,6 @@
// present in the response here.
// TODO: Figure out if we need to support decompressive transcoding
// https://cloud.google.com/storage/docs/transcoding.
content := msg.GetChecksummedData().GetContent()
n = copy(p[n:], content)
leftover := len(content) - n
if leftover > 0 {
Expand All @@ -1471,7 +1529,7 @@
return nil
}

// recv attempts to Recv the next message on the stream. In the event
// recv attempts to Recv the next chunk of content on the stream. In the event
tritone marked this conversation as resolved.
Show resolved Hide resolved
// that a retryable error is encountered, the stream will be closed, reopened,
// and Recv again. This will attempt to Recv until one of the following is true:
//
Expand All @@ -1481,8 +1539,9 @@
//
// The last error received is the one that is returned, which could be from
// an attempt to reopen the stream.
func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
msg, err := r.stream.Recv()
func (r *gRPCReader) recv() ([]byte, error) {
err := r.stream.RecvMsg(&r.databuf)

var shouldRetry = ShouldRetry
if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
shouldRetry = r.settings.retry.shouldRetry
Expand All @@ -1492,10 +1551,144 @@
// reopen the stream, but will backoff if further attempts are necessary.
// Reopening the stream Recvs the first message, so if retrying is
// successful, the next logical chunk will be returned.
msg, err = r.reopenStream()
msg, err := r.reopenStream()
tritone marked this conversation as resolved.
Show resolved Hide resolved
return msg.GetChecksummedData().GetContent(), err
}

if err != nil {
return nil, err
}

return readObjectResponseContent(r.databuf)
}

// readObjectResponseContent returns the checksummed_data.content field of a
// ReadObjectResponse message, or an error if the message is invalid.
// This can be used on recvs of objects after the first recv, since only the
// first message will contain non-data fields.
func readObjectResponseContent(b []byte) ([]byte, error) {
const (
readObjectResponse_checksummedData = protowire.Number(1)

Check failure on line 1571 in storage/grpc_client.go

View workflow job for this annotation

GitHub Actions / vet

don't use underscores in Go names; const readObjectResponse_checksummedData should be readObjectResponseChecksummedData
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
checksummedData_content = protowire.Number(1)

Check failure on line 1572 in storage/grpc_client.go

View workflow job for this annotation

GitHub Actions / vet

don't use underscores in Go names; const checksummedData_content should be checksummedDataContent
)

checksummedData, err := readProtoBytes(b, readObjectResponse_checksummedData)
if err != nil {
return b, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}
content, err := readProtoBytes(checksummedData, checksummedData_content)
if err != nil {
return content, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}

return content, nil
}

// readFullObjectResponse returns the ReadObjectResponse encoded in the
// wire-encoded message buffer b, or an error if the message is invalid.
// This is used on the first recv of an object as it may contain all fields of
// ReadObjectResponse.
tritone marked this conversation as resolved.
Show resolved Hide resolved
func readFullObjectResponse(b []byte) (*storagepb.ReadObjectResponse, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to note that this function is essentially identical to proto.Unmarshal, except it aliases the data in the input []byte. If we ever add a feature to Unmarshal that does that, this function can be dropped.

const (
checksummedDataField = protowire.Number(1)
checksummedDataContentField = protowire.Number(1)
checksummedDataCRC32CField = protowire.Number(2)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
objectChecksumsField = protowire.Number(2)
contentRangeField = protowire.Number(3)
metadataField = protowire.Number(4)
)

checksummedData, err := readProtoBytes(b, checksummedDataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
}
content, err := readProtoBytes(checksummedData, checksummedDataContentField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}

// TODO: add unmarshalling for crc32c here
//crc32c := readProtoBytes(checksummedData, checksummedDataCRC32CField)
tritone marked this conversation as resolved.
Show resolved Hide resolved

// Unmarshal remaining fields.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're unmarshaling all the fields in the message, so this will be a bit more efficient if you loop once over the fields rather than looking up each individually:

off := 0
for off < len(b) {
  num, typ, n := protowire.ConsumeTag(b[off:])
  if n < 0 {
    return nil, protowire.ParseError(n)
  }
  off += n
  switch {
  case num == checksummedDataField && typ == protowire.BytesType:
    // unmarshal the checksummed_data field
  case num == objectChecksumsField && typ == protowire.BytesType:
    // unmarshal the object_checksums field
  case num == contentRangeField && typ == protowire.BytesType:
    // unmarshal the content_range field
  case num == metadataField && typ == protowire.BytesType:
    // unmarshal the metadata field
  default:
    n = protowire.ConsumeFieldValue(num, typ, b[off:])
    if n < 0 {
      return nil, protowire.ParseError(n)
    }
    off += n
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion!

var checksums *storagepb.ObjectChecksums
bytes, err := readProtoBytes(b, objectChecksumsField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}
// If the field is not empty, unmarshal its contents
if len(bytes) > 0 {
checksums = &storagepb.ObjectChecksums{}
if err := proto.Unmarshal(bytes, checksums); err != nil {
return nil, err
}
}

var contentRange *storagepb.ContentRange
bytes, err = readProtoBytes(b, contentRangeField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", "err")
}
if len(bytes) > 0 {
contentRange = &storagepb.ContentRange{}
if err := proto.Unmarshal(bytes, contentRange); err != nil {
return nil, err
}
}

var metadata *storagepb.Object
bytes, err = readProtoBytes(b, metadataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}
if len(bytes) > 0 {
metadata = &storagepb.Object{}
if err := proto.Unmarshal(bytes, metadata); err != nil {
return nil, err
}
}

msg := &storagepb.ReadObjectResponse{
ChecksummedData: &storagepb.ChecksummedData{
Content: content,
},
ObjectChecksums: checksums,
ContentRange: contentRange,
Metadata: metadata,
}

return msg, err
return msg, nil
}

// readProtoBytes returns the contents of the protobuf field with number num
// and type bytes from a wire-encoded message. If the field cannot be found,
// the returned slice will be nil and no error will be returned.
//
// It does not handle field concatenation, in which the contents of a single field
// are split across multiple protobuf tags. Encoded data containing split fields
// of this form is technically permissable, but uncommon.
func readProtoBytes(b []byte, num protowire.Number) ([]byte, error) {
off := 0
for off < len(b) {
gotNum, gotTyp, n := protowire.ConsumeTag(b[off:])
if n < 0 {
return nil, protowire.ParseError(n)
}
off += n
if gotNum == num && gotTyp == protowire.BytesType {
b, n := protowire.ConsumeBytes(b[off:])
if n < 0 {
return nil, protowire.ParseError(n)
}
return b, nil
}
n = protowire.ConsumeFieldValue(gotNum, gotTyp, b[off:])
if n < 0 {
return nil, protowire.ParseError(n)
}
off += n
}
return nil, nil
}

// reopenStream "closes" the existing stream and attempts to reopen a stream and
Expand Down
6 changes: 5 additions & 1 deletion storage/integration_test.go
Expand Up @@ -1023,7 +1023,8 @@ func TestIntegration_ObjectReadChunksGRPC(t *testing.T) {
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
h := testHelper{t}
// Use a larger blob to test chunking logic. This is a little over 5MB.
content := bytes.Repeat([]byte("a"), 5<<20)
content := make([]byte, 5<<20)
rand.New(rand.NewSource(0)).Read(content)
tritone marked this conversation as resolved.
Show resolved Hide resolved

// Upload test data.
obj := client.Bucket(bucket).Object(uidSpaceObjects.New())
Expand Down Expand Up @@ -1066,6 +1067,9 @@ func TestIntegration_ObjectReadChunksGRPC(t *testing.T) {
if rem := r.Remain(); rem != 0 {
t.Errorf("got %v bytes remaining, want 0", rem)
}
if !bytes.Equal(buf, content) {
t.Errorf("content mismatch")
}
})
}

Expand Down