Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(storage): remove protobuf's copy of data on unmarshalling #9526

Merged
merged 13 commits into from
Mar 19, 2024
Prev Previous commit
Next Next commit
loop over bytes instead of reusing function
  • Loading branch information
BrennaEpp committed Mar 19, 2024
commit 07e6b4f78cd0d4e57640eae278608231dc56b3fe
198 changes: 108 additions & 90 deletions storage/grpc_client.go
Expand Up @@ -1587,78 +1587,123 @@ func readObjectResponseContent(b []byte) ([]byte, error) {
return content, nil
}

// readFullObjectResponse returns the ReadObjectResponse encoded in the
// readFullObjectResponse returns the ReadObjectResponse that is encoded in the
// wire-encoded message buffer b, or an error if the message is invalid.
// This is used on the first recv of an object as it may contain all fields of
// ReadObjectResponse.
// This must be used on the first recv of an object as it may contain all fields
// of ReadObjectResponse, and we use or pass on those fields to the user.
// This function is essentially identical to proto.Unmarshal, except it aliases
// the data in the input []byte. If the proto library adds a feature to
// Unmarshal that does that, this function can be dropped.
func readFullObjectResponse(b []byte) (*storagepb.ReadObjectResponse, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to note that this function is essentially identical to proto.Unmarshal, except it aliases the data in the input []byte. If we ever add a feature to Unmarshal that does that, this function can be dropped.

var checksummedData *storagepb.ChecksummedData
msg := &storagepb.ReadObjectResponse{}

// Extract object content.
fieldContent, err := readProtoBytes(b, checksummedDataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData: %v", err)
}
// Only fill the contents if the checksummedData field was found.
if fieldContent != nil {
content, err := readProtoBytes(fieldContent, checksummedDataContentField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", err)
}
crc32c, err := readProtoFixed32(fieldContent, checksummedDataCRC32CField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Crc32C: %v", err)
// Loop over the entire message, extracting fields as we go. This does not
// handle field concatenation, in which the contents of a single field
// are split across multiple protobuf tags.
off := 0
for off < len(b) {
// Consume the next tag. This will tell us which field is next in the
// buffer, its type, and how much space it takes up.
fieldNum, fieldType, fieldLength := protowire.ConsumeTag(b[off:])
if fieldLength < 0 {
return nil, protowire.ParseError(fieldLength)
}
off += fieldLength

checksummedData = &storagepb.ChecksummedData{
Content: content,
Crc32C: crc32c,
}
}
// Unmarshal the field according to its type. Only fields that are not
// nil will be present.
switch {
case fieldNum == checksummedDataField && fieldType == protowire.BytesType:
// The ChecksummedData field was found. Initialize the struct.
msg.ChecksummedData = &storagepb.ChecksummedData{}

// Unmarshal remaining fields.
var checksums *storagepb.ObjectChecksums
fieldContent, err = readProtoBytes(b, objectChecksumsField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ObjectChecksums: %v", err)
}
// Only unmarshal the contents if the field was found.
if fieldContent != nil {
checksums = &storagepb.ObjectChecksums{}
if err := proto.Unmarshal(fieldContent, checksums); err != nil {
return nil, err
}
}
// Get the bytes corresponding to the checksummed data.
fieldContent, n := protowire.ConsumeBytes(b[off:])
if n < 0 {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData: %v", protowire.ParseError(n))
}
off += n

// Get the nested fields. We need to do this manually as it contains
// the object content bytes.
contentOff := 0
for contentOff < len(fieldContent) {
gotNum, gotTyp, n := protowire.ConsumeTag(fieldContent[contentOff:])
if n < 0 {
return nil, protowire.ParseError(n)
}
contentOff += n

switch {
case gotNum == checksummedDataContentField && gotTyp == protowire.BytesType:
// Get the content bytes.
bytes, n := protowire.ConsumeBytes(fieldContent[contentOff:])
if n < 0 {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", protowire.ParseError(n))
}
msg.ChecksummedData.Content = bytes
contentOff += n
case gotNum == checksummedDataCRC32CField && gotTyp == protowire.Fixed32Type:
v, n := protowire.ConsumeFixed32(fieldContent[contentOff:])
if n < 0 {
return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Crc32C: %v", protowire.ParseError(n))
}
msg.ChecksummedData.Crc32C = &v
contentOff += n
default:
n = protowire.ConsumeFieldValue(gotNum, gotTyp, fieldContent[contentOff:])
if n < 0 {
return nil, protowire.ParseError(n)
}
contentOff += n
}
}
case fieldNum == objectChecksumsField && fieldType == protowire.BytesType:
// The field was found. Initialize the struct.
msg.ObjectChecksums = &storagepb.ObjectChecksums{}

var contentRange *storagepb.ContentRange
fieldContent, err = readProtoBytes(b, contentRangeField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.ContentRange: %v", "err")
}
if fieldContent != nil {
contentRange = &storagepb.ContentRange{}
if err := proto.Unmarshal(fieldContent, contentRange); err != nil {
return nil, err
}
}
// Get the bytes corresponding to the checksums.
bytes, n := protowire.ConsumeBytes(b[off:])
if n < 0 {
return nil, fmt.Errorf("invalid ReadObjectResponse.ObjectChecksums: %v", protowire.ParseError(n))
}
off += n

var metadata *storagepb.Object
fieldContent, err = readProtoBytes(b, metadataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse.Metadata: %v", err)
}
if fieldContent != nil {
metadata = &storagepb.Object{}
if err := proto.Unmarshal(fieldContent, metadata); err != nil {
return nil, err
}
}
// Unmarshal.
if err := proto.Unmarshal(bytes, msg.ObjectChecksums); err != nil {
return nil, err
}
case fieldNum == contentRangeField && fieldType == protowire.BytesType:
msg.ContentRange = &storagepb.ContentRange{}

msg := &storagepb.ReadObjectResponse{
ChecksummedData: checksummedData,
ObjectChecksums: checksums,
ContentRange: contentRange,
Metadata: metadata,
bytes, n := protowire.ConsumeBytes(b[off:])
if n < 0 {
return nil, fmt.Errorf("invalid ReadObjectResponse.ContentRange: %v", protowire.ParseError(n))
}
off += n

if err := proto.Unmarshal(bytes, msg.ContentRange); err != nil {
return nil, err
}
case fieldNum == metadataField && fieldType == protowire.BytesType:
msg.Metadata = &storagepb.Object{}

bytes, n := protowire.ConsumeBytes(b[off:])
if n < 0 {
return nil, fmt.Errorf("invalid ReadObjectResponse.Metadata: %v", protowire.ParseError(n))
}
off += n

if err := proto.Unmarshal(bytes, msg.Metadata); err != nil {
return nil, err
}
default:
fieldLength = protowire.ConsumeFieldValue(fieldNum, fieldType, b[off:])
if fieldLength < 0 {
return nil, fmt.Errorf("default: %v", protowire.ParseError(fieldLength))
}
off += fieldLength
}
}

return msg, nil
Expand Down Expand Up @@ -1695,33 +1740,6 @@ func readProtoBytes(b []byte, num protowire.Number) ([]byte, error) {
return nil, nil
}

// readProtoFixed32 returns the contents of the protobuf field with number num
// and type uint32 from a wire-encoded message. If the field cannot be found,
// the returned pointer will be nil and no error will be returned.
func readProtoFixed32(b []byte, num protowire.Number) (*uint32, error) {
off := 0
for off < len(b) {
gotNum, gotTyp, n := protowire.ConsumeTag(b[off:])
if n < 0 {
return nil, protowire.ParseError(n)
}
off += n
if gotNum == num && gotTyp == protowire.Fixed32Type {
v, n := protowire.ConsumeFixed32(b[off:])
if n < 0 {
return nil, protowire.ParseError(n)
}
return &v, nil
}
n = protowire.ConsumeFieldValue(gotNum, gotTyp, b[off:])
if n < 0 {
return nil, protowire.ParseError(n)
}
off += n
}
return nil, nil
}

// reopenStream "closes" the existing stream and attempts to reopen a stream and
// sets the Reader's stream and cancelStream properties in the process.
func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {
Expand Down