Skip to content

Commit 7057629

Browse files
authored
fix: fix grpc ReadObject memory leak introduced in 2.51.0 (#3080)
1 parent 3274cd8 commit 7057629

File tree

2 files changed

+125
-33
lines changed

2 files changed

+125
-33
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -163,41 +163,42 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
163163
readObjectObserver.request();
164164

165165
ReadObjectResponse resp = (ReadObjectResponse) take;
166-
ResponseContentLifecycleHandle<ReadObjectResponse> handle =
167-
read.getResponseContentLifecycleManager().get(resp);
168-
ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle);
169-
if (resp.hasMetadata()) {
170-
Object respMetadata = resp.getMetadata();
171-
if (metadata == null) {
172-
metadata = respMetadata;
173-
} else if (metadata.getGeneration() != respMetadata.getGeneration()) {
174-
throw closeWithError(
175-
String.format(
176-
Locale.US,
177-
"Mismatch Generation between subsequent reads. Expected %d but received %d",
178-
metadata.getGeneration(),
179-
respMetadata.getGeneration()));
166+
try (ResponseContentLifecycleHandle<ReadObjectResponse> handle =
167+
read.getResponseContentLifecycleManager().get(resp)) {
168+
ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle);
169+
if (resp.hasMetadata()) {
170+
Object respMetadata = resp.getMetadata();
171+
if (metadata == null) {
172+
metadata = respMetadata;
173+
} else if (metadata.getGeneration() != respMetadata.getGeneration()) {
174+
throw closeWithError(
175+
String.format(
176+
Locale.US,
177+
"Mismatch Generation between subsequent reads. Expected %d but received %d",
178+
metadata.getGeneration(),
179+
respMetadata.getGeneration()));
180+
}
180181
}
181-
}
182-
ChecksummedData checksummedData = resp.getChecksummedData();
183-
ByteString content = checksummedData.getContent();
184-
int contentSize = content.size();
185-
// Very important to know whether a crc32c value is set. Without checking, protobuf will
186-
// happily return 0, which is a valid crc32c value.
187-
if (checksummedData.hasCrc32C()) {
188-
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
189-
try {
190-
hasher.validate(expected, content);
191-
} catch (IOException e) {
192-
close();
193-
throw e;
182+
ChecksummedData checksummedData = resp.getChecksummedData();
183+
ByteString content = checksummedData.getContent();
184+
int contentSize = content.size();
185+
// Very important to know whether a crc32c value is set. Without checking, protobuf will
186+
// happily return 0, which is a valid crc32c value.
187+
if (checksummedData.hasCrc32C()) {
188+
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
189+
try {
190+
hasher.validate(expected, content);
191+
} catch (IOException e) {
192+
close();
193+
throw e;
194+
}
195+
}
196+
ref.copy(c, dsts, offset, length);
197+
if (ref.hasRemaining()) {
198+
leftovers = ref;
199+
} else {
200+
ref.close();
194201
}
195-
}
196-
ref.copy(c, dsts, offset, length);
197-
if (ref.hasRemaining()) {
198-
leftovers = ref;
199-
} else {
200-
ref.close();
201202
}
202203
}
203204
long read = c.read();
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.cloud.storage.TestUtils.xxd;
20+
import static com.google.common.truth.Truth.assertThat;
21+
22+
import com.google.api.core.SettableApiFuture;
23+
import com.google.api.gax.rpc.ApiCallContext;
24+
import com.google.api.gax.rpc.ResponseObserver;
25+
import com.google.api.gax.rpc.ServerStreamingCallable;
26+
import com.google.api.gax.rpc.StreamController;
27+
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
28+
import com.google.cloud.storage.Retrying.Retrier;
29+
import com.google.cloud.storage.it.ChecksummedTestContent;
30+
import com.google.storage.v2.ReadObjectRequest;
31+
import com.google.storage.v2.ReadObjectResponse;
32+
import java.io.IOException;
33+
import java.nio.ByteBuffer;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import org.junit.Test;
36+
37+
public final class GapicUnbufferedReadableByteChannelTest {
38+
39+
@Test
40+
public void ensureResponseAreClosed() throws IOException {
41+
ChecksummedTestContent testContent =
42+
ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10));
43+
44+
AtomicBoolean close = new AtomicBoolean(false);
45+
46+
ResponseContentLifecycleManager<ReadObjectResponse> manager =
47+
resp -> ResponseContentLifecycleHandle.create(resp, () -> close.compareAndSet(false, true));
48+
49+
try (GapicUnbufferedReadableByteChannel c =
50+
new GapicUnbufferedReadableByteChannel(
51+
SettableApiFuture.create(),
52+
new ZeroCopyServerStreamingCallable<>(
53+
new ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse>() {
54+
@Override
55+
public void call(
56+
ReadObjectRequest request,
57+
ResponseObserver<ReadObjectResponse> respond,
58+
ApiCallContext context) {
59+
respond.onStart(new NullStreamController());
60+
respond.onResponse(
61+
ReadObjectResponse.newBuilder()
62+
.setChecksummedData(testContent.asChecksummedData())
63+
.build());
64+
respond.onComplete();
65+
}
66+
},
67+
manager),
68+
ReadObjectRequest.getDefaultInstance(),
69+
Hasher.noop(),
70+
Retrier.attemptOnce(),
71+
Retrying.neverRetry())) {
72+
73+
ByteBuffer buffer = ByteBuffer.allocate(15);
74+
c.read(buffer);
75+
assertThat(xxd(buffer)).isEqualTo(xxd(testContent.getBytes()));
76+
assertThat(close.get()).isTrue();
77+
}
78+
}
79+
80+
private static class NullStreamController implements StreamController {
81+
82+
@Override
83+
public void cancel() {}
84+
85+
@Override
86+
public void disableAutoInboundFlowControl() {}
87+
88+
@Override
89+
public void request(int count) {}
90+
}
91+
}

0 commit comments

Comments
 (0)