Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(storage): remove protobuf's copy of data on unmarshalling #9526

Merged
merged 13 commits into from
Mar 19, 2024
Prev Previous commit
Next Next commit
add test + fix empty struct decoding
  • Loading branch information
BrennaEpp committed Mar 15, 2024
commit d1a9f00fae2ce179c80a0e860860c74c4e46d084
51 changes: 29 additions & 22 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,63 +1592,70 @@ func readObjectResponseContent(b []byte) ([]byte, error) {
// This is used on the first recv of an object as it may contain all fields of
// ReadObjectResponse.
tritone marked this conversation as resolved.
Show resolved Hide resolved
func readFullObjectResponse(b []byte) (*storagepb.ReadObjectResponse, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to note that this function is essentially identical to proto.Unmarshal, except it aliases the data in the input []byte. If we ever add a feature to Unmarshal that does that, this function can be dropped.

checksummedData, err := readProtoBytes(b, checksummedDataField)
var checksummedData *storagepb.ChecksummedData

// Extract object content.
fieldContent, err := readProtoBytes(b, checksummedDataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData: %v", err)
}
content, err := readProtoBytes(checksummedData, checksummedDataContentField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", err)
}
// Only fill the contents if the checksummedData field was found.
if fieldContent != nil {
content, err := readProtoBytes(fieldContent, checksummedDataContentField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", err)
}
crc32c, err := readProtoFixed32(fieldContent, checksummedDataCRC32CField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Crc32C: %v", err)
}

crc32c, err := readProtoFixed32(checksummedData, checksummedDataCRC32CField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Crc32C: %v", err)
checksummedData = &storagepb.ChecksummedData{
Content: content,
Crc32C: crc32c,
}
}

// Unmarshal remaining fields.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're unmarshaling all the fields in the message, so this will be a bit more efficient if you loop once over the fields rather than looking up each individually:

off := 0
for off < len(b) {
  num, typ, n := protowire.ConsumeTag(b[off:])
  if n < 0 {
    return nil, protowire.ParseError(n)
  }
  off += n
  switch {
  case num == checksummedDataField && typ == protowire.BytesType:
    // unmarshal the checksummed_data field
  case num == objectChecksumsField && typ == protowire.BytesType:
    // unmarshal the object_checksums field
  case num == contentRangeField && typ == protowire.BytesType:
    // unmarshal the content_range field
  case num == metadataField && typ == protowire.BytesType:
    // unmarshal the metadata field
  default:
    n = protowire.ConsumeFieldValue(num, typ, b[off:])
    if n < 0 {
      return nil, protowire.ParseError(n)
    }
    off += n
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion!

var checksums *storagepb.ObjectChecksums
bytes, err := readProtoBytes(b, objectChecksumsField)
fieldContent, err = readProtoBytes(b, objectChecksumsField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ObjectChecksums: %v", err)
}
// If the field is not empty, unmarshal its contents
if len(bytes) > 0 {
// Only unmarshal the contents if the field was found.
if fieldContent != nil {
checksums = &storagepb.ObjectChecksums{}
if err := proto.Unmarshal(bytes, checksums); err != nil {
if err := proto.Unmarshal(fieldContent, checksums); err != nil {
return nil, err
}
}

var contentRange *storagepb.ContentRange
bytes, err = readProtoBytes(b, contentRangeField)
fieldContent, err = readProtoBytes(b, contentRangeField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ContentRange: %v", "err")
}
if len(bytes) > 0 {
if fieldContent != nil {
contentRange = &storagepb.ContentRange{}
if err := proto.Unmarshal(bytes, contentRange); err != nil {
if err := proto.Unmarshal(fieldContent, contentRange); err != nil {
return nil, err
}
}

var metadata *storagepb.Object
bytes, err = readProtoBytes(b, metadataField)
fieldContent, err = readProtoBytes(b, metadataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.Metadata: %v", err)
}
if len(bytes) > 0 {
if fieldContent != nil {
metadata = &storagepb.Object{}
if err := proto.Unmarshal(bytes, metadata); err != nil {
if err := proto.Unmarshal(fieldContent, metadata); err != nil {
return nil, err
}
}

msg := &storagepb.ReadObjectResponse{
ChecksummedData: &storagepb.ChecksummedData{
Content: content,
Crc32C: crc32c,
},
ChecksummedData: checksummedData,
ObjectChecksums: checksums,
ContentRange: contentRange,
Metadata: metadata,
Expand Down
147 changes: 147 additions & 0 deletions storage/grpc_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"crypto/md5"
"hash/crc32"
"math/rand"
"testing"
"time"

"cloud.google.com/go/storage/internal/apiv2/storagepb"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
)

func TestBytesCodec(t *testing.T) {
// Generate some random content.
content := make([]byte, 1<<10+1) // 1 kib + 1 byte
rand.New(rand.NewSource(0)).Read(content)

// Calculate full content hashes.
crc32c := crc32.Checksum(content, crc32.MakeTable(crc32.Castagnoli))
hasher := md5.New()
if _, err := hasher.Write(content); err != nil {
t.Errorf("hasher.Write: %v", err)
}
md5 := hasher.Sum(nil)

trueBool := true
metadata := &storagepb.Object{
Name: "object-name",
Bucket: "bucket-name",
Etag: "etag",
Generation: 100,
Metageneration: 907,
StorageClass: "Standard",
Size: 1025,
ContentEncoding: "none",
ContentDisposition: "inline",
CacheControl: "public, max-age=3600",
Acl: []*storagepb.ObjectAccessControl{{
Role: "role",
Id: "id",
Entity: "allUsers",
Etag: "tag",
Email: "[email protected]",
}},
ContentLanguage: "mi, en",
DeleteTime: toProtoTimestamp(time.Now()),
ContentType: "application/octet-stream",
CreateTime: toProtoTimestamp(time.Now()),
ComponentCount: 1,
Checksums: &storagepb.ObjectChecksums{
Crc32C: &crc32c,
Md5Hash: md5,
},
TemporaryHold: true,
Metadata: map[string]string{
"a-key": "a-value",
},
EventBasedHold: &trueBool,
Owner: &storagepb.Owner{
Entity: "user-1",
EntityId: "1",
},
CustomerEncryption: &storagepb.CustomerEncryption{
EncryptionAlgorithm: "alg",
KeySha256Bytes: []byte("bytes"),
},
HardDeleteTime: toProtoTimestamp(time.Now()),
}

for _, test := range []struct {
desc string
resp *storagepb.ReadObjectResponse
}{
{
desc: "filled object response",
resp: &storagepb.ReadObjectResponse{
ChecksummedData: &storagepb.ChecksummedData{
Content: content,
Crc32C: &crc32c,
},
ObjectChecksums: &storagepb.ObjectChecksums{
Crc32C: &crc32c,
Md5Hash: md5,
},
ContentRange: &storagepb.ContentRange{
Start: 0,
End: 1025,
CompleteLength: 1025,
},
Metadata: metadata,
},
},
{
desc: "empty object response",
resp: &storagepb.ReadObjectResponse{},
},
{
desc: "partially empty",
resp: &storagepb.ReadObjectResponse{
ChecksummedData: &storagepb.ChecksummedData{},
ObjectChecksums: &storagepb.ObjectChecksums{Md5Hash: md5},
Metadata: &storagepb.Object{},
},
},
} {
t.Run(test.desc, func(t *testing.T) {
// Encode the response.
encodedResp, err := proto.Marshal(test.resp)
if err != nil {
t.Fatalf("proto.Marshal: %v", err)
}

// Unmarshal and decode response using custom decoding.
encodedBytes := &[]byte{}
if err := bytesCodec.Unmarshal(bytesCodec{}, encodedResp, encodedBytes); err != nil {
tritone marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("unmarshal: %v", err)
}

got, err := readFullObjectResponse(*encodedBytes)
if err != nil {
t.Fatalf("readFullObjectResponse: %v", err)
}

// Compare the result with the original ReadObjectResponse.
if diff := cmp.Diff(got, test.resp, protocmp.Transform()); diff != "" {
t.Errorf("cmp.Diff got(-),want(+):\n%s", diff)
}
})
}
}