Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(storage): remove protobuf's copy of data on unmarshalling #9526

Merged
merged 13 commits into from
Mar 19, 2024
Prev Previous commit
Next Next commit
add custom decoding for the first msg
  • Loading branch information
BrennaEpp committed Mar 8, 2024
commit d592214bd470402b8e4aacb6afdb130d578064ad
129 changes: 111 additions & 18 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,8 @@
req.Generation = params.gen
}

var databuf []byte

// Define a function that initiates a Read with offset and length, assuming
// we have already read seen bytes.
reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
Expand Down Expand Up @@ -1001,17 +1003,23 @@
return err
}

// This receive still goes through protobuf unmarshaling.
// Subsequent receives in Read calls will skip protobuf unmarshaling
// and directly read the content from the gRPC []byte response.
//
// We could also use a custom decoder here.
msg, err = stream.Recv()
// Receive the message as a wire-encoded message so we can use a
// custom decoder to avoid an extra copy at the protobuf layer.
err := stream.RecvMsg(&databuf)
// These types of errors show up on the Recv call, rather than the
// initialization of the stream via ReadObject above.
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return ErrObjectNotExist
}
if err != nil {
return err
}
// Use a custom decoder that uses protobuf unmarshalling for all
// fields except the checksummed data content.
// Subsequent receives in Read calls will skip all protobuf
// unmarshalling and directly read the content from the gRPC []byte
// response, since only the first call will contain other fields.
msg, err = readFullObjectResponse(databuf)

return err
}, s.retry, s.idempotent)
Expand Down Expand Up @@ -1057,6 +1065,7 @@
leftovers: msg.GetChecksummedData().GetContent(),
settings: s,
zeroRange: params.length == 0,
databuf: databuf,
},
}

Expand Down Expand Up @@ -1554,48 +1563,132 @@
}

// readObjectResponseContent returns the checksummed_data.content field of a
// ReadObjectResponse message.
// ReadObjectResponse message, or an error if the message is invalid.
// This can be used on recvs of objects after the first recv, since only the
// first message will contain non-data fields.
func readObjectResponseContent(b []byte) ([]byte, error) {
const (
readObjectResponse_checksummedData = protowire.Number(1)

Check failure on line 1571 in storage/grpc_client.go

View workflow job for this annotation

GitHub Actions / vet

don't use underscores in Go names; const readObjectResponse_checksummedData should be readObjectResponseChecksummedData
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
checksummedData_content = protowire.Number(1)

Check failure on line 1572 in storage/grpc_client.go

View workflow job for this annotation

GitHub Actions / vet

don't use underscores in Go names; const checksummedData_content should be checksummedDataContent
)
checksummedData := readProtoBytes(b, readObjectResponse_checksummedData)
content := readProtoBytes(checksummedData, checksummedData_content)
if content == nil {
return nil, errors.New("invalid ReadObjectResponse")

checksummedData, err := readProtoBytes(b, readObjectResponse_checksummedData)
if err != nil {
return b, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}
content, err := readProtoBytes(checksummedData, checksummedData_content)
if err != nil {
return content, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}

return content, nil
}

// readFullObjectResponse returns the ReadObjectResponse encoded in the
// wire-encoded message buffer b, or an error if the message is invalid.
// This is used on the first recv of an object as it may contain all fields of
// ReadObjectResponse.
tritone marked this conversation as resolved.
Show resolved Hide resolved
func readFullObjectResponse(b []byte) (*storagepb.ReadObjectResponse, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to note that this function is essentially identical to proto.Unmarshal, except it aliases the data in the input []byte. If we ever add a feature to Unmarshal that does that, this function can be dropped.

const (
checksummedDataField = protowire.Number(1)
checksummedDataContentField = protowire.Number(1)
checksummedDataCRC32CField = protowire.Number(2)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
objectChecksumsField = protowire.Number(2)
contentRangeField = protowire.Number(3)
metadataField = protowire.Number(4)
)

checksummedData, err := readProtoBytes(b, checksummedDataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
}
content, err := readProtoBytes(checksummedData, checksummedDataContentField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}

// TODO: add unmarshalling for crc32c here
//crc32c := readProtoBytes(checksummedData, checksummedDataCRC32CField)
tritone marked this conversation as resolved.
Show resolved Hide resolved

// Unmarshal remaining fields.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're unmarshaling all the fields in the message, so this will be a bit more efficient if you loop once over the fields rather than looking up each individually:

off := 0
for off < len(b) {
  num, typ, n := protowire.ConsumeTag(b[off:])
  if n < 0 {
    return nil, protowire.ParseError(n)
  }
  off += n
  switch {
  case num == checksummedDataField && typ == protowire.BytesType:
    // unmarshal the checksummed_data field
  case num == objectChecksumsField && typ == protowire.BytesType:
    // unmarshal the object_checksums field
  case num == contentRangeField && typ == protowire.BytesType:
    // unmarshal the content_range field
  case num == metadataField && typ == protowire.BytesType:
    // unmarshal the metadata field
  default:
    n = protowire.ConsumeFieldValue(num, typ, b[off:])
    if n < 0 {
      return nil, protowire.ParseError(n)
    }
    off += n
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion!

var checksums *storagepb.ObjectChecksums
bytes, err := readProtoBytes(b, objectChecksumsField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}
// If the field is not empty, unmarshal its contents
if len(bytes) > 0 {
checksums = &storagepb.ObjectChecksums{}
if err := proto.Unmarshal(bytes, checksums); err != nil {
return nil, err
}
}

var contentRange *storagepb.ContentRange
bytes, err = readProtoBytes(b, contentRangeField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", "err")
}
if len(bytes) > 0 {
contentRange = &storagepb.ContentRange{}
if err := proto.Unmarshal(bytes, contentRange); err != nil {
return nil, err
}
}

var metadata *storagepb.Object
bytes, err = readProtoBytes(b, metadataField)
if err != nil {
return nil, fmt.Errorf("invalid ReadObjectResponse: %v", err)
}
if len(bytes) > 0 {
metadata = &storagepb.Object{}
if err := proto.Unmarshal(bytes, metadata); err != nil {
return nil, err
}
}

msg := &storagepb.ReadObjectResponse{
ChecksummedData: &storagepb.ChecksummedData{
Content: content,
},
ObjectChecksums: checksums,
ContentRange: contentRange,
Metadata: metadata,
}

return msg, nil
}

// readProtoBytes returns the contents of the protobuf field with number num
// and type bytes from a wire-encoded message, or nil if the field can not be found.
// and type bytes from a wire-encoded message. If the field cannot be found,
// the returned slice will be nil and no error will be returned.
//
// It does not handle field concatenation, in which the contents of a single field
// are split across multiple protobuf tags. Encoded data containing split fields
// of this form is technically permissable, but uncommon.
func readProtoBytes(b []byte, num protowire.Number) []byte {
func readProtoBytes(b []byte, num protowire.Number) ([]byte, error) {
off := 0
for off < len(b) {
gotNum, gotTyp, n := protowire.ConsumeTag(b[off:])
if n < 0 {
return nil
return nil, protowire.ParseError(n)
}
off += n
if gotNum == num && gotTyp == protowire.BytesType {
b, n := protowire.ConsumeBytes(b[off:])
if n < 0 {
return nil
return nil, protowire.ParseError(n)
}
return b
return b, nil
}
n = protowire.ConsumeFieldValue(gotNum, gotTyp, b[off:])
if n < 0 {
return nil
return nil, protowire.ParseError(n)
}
off += n
}
return nil
return nil, nil
}

// reopenStream "closes" the existing stream and attempts to reopen a stream and
Expand Down