Skip to content

Commit 895bfbd

Browse files
authored
fix: fix Journaling BlobWriteSessionConfig to properly handle multiple consecutive retries (#3166)
* Clear copy buffer used to load bytes from the recovery file so, we don't have dangling bytes from a previous retry * Do not add to the cumulative crc32c when retrying. The bytes have already been added in the first attempt.
1 parent aa6b7cb commit 895bfbd

File tree

3 files changed

+236
-8
lines changed

3 files changed

+236
-8
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private GatheringByteChannel openSync() throws IOException {
149149
return sync;
150150
}
151151

152-
private WriteObjectRequest processSegment(ChunkSegment segment) {
152+
private WriteObjectRequest processSegment(ChunkSegment segment, boolean updateCumulativeCrc32c) {
153153
WriteObjectRequest.Builder builder = writeCtx.newRequestBuilder();
154154
if (!first) {
155155
builder.clearUploadId().clearWriteObjectSpec().clearObjectChecksums();
@@ -162,9 +162,11 @@ private WriteObjectRequest processSegment(ChunkSegment segment) {
162162
int contentSize = b.size();
163163

164164
// update ctx state that tracks overall progress
165-
writeCtx
166-
.getCumulativeCrc32c()
167-
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
165+
if (updateCumulativeCrc32c) {
166+
writeCtx
167+
.getCumulativeCrc32c()
168+
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
169+
}
168170
// resolve current offset and set next
169171
long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);
170172

@@ -202,6 +204,7 @@ private WriteObjectRequest.Builder finishMessage(WriteObjectRequest.Builder b) {
202204
return b;
203205
}
204206

207+
@SuppressWarnings("ConstantValue")
205208
private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
206209
AtomicBoolean recover = new AtomicBoolean(false);
207210
retrier.run(
@@ -211,9 +214,16 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
211214
sync.close();
212215
}
213216
boolean shouldRecover = recover.getAndSet(true);
217+
// each ChunkSegment will always have its checksum computed, but if a retry happens, and
218+
// we need to rewind and build a new ChunkSegment, we don't want to add it to the
219+
// cumulativeCrc32c value because that will make it appear as the bytes are duplicated.
220+
// If we send "ABCD", get an error and find only "AB" to have been persisted, we don't
221+
// want to add "CD" to the cumulative crc32c as that would be equivalent to "ABCDCD".
222+
boolean updateCumulativeCrc32c = !shouldRecover;
214223
if (!shouldRecover) {
215224
for (ChunkSegment segment : segments) {
216-
WriteObjectRequest writeObjectRequest = processSegment(segment);
225+
WriteObjectRequest writeObjectRequest =
226+
processSegment(segment, updateCumulativeCrc32c);
217227
stream.onNext(writeObjectRequest);
218228
}
219229

@@ -247,17 +257,22 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
247257
first = true;
248258
writeCtx.getTotalSentBytes().set(persistedSize);
249259
writeCtx.getConfirmedBytes().set(persistedSize);
250-
writeCtx.getCumulativeCrc32c().set(null); // todo: can we rewind checksum?
260+
// intentionally do not modify the cumulativeCrc32c value
261+
// this will stay in the state in sync with what has been written to disk
262+
// when we recover, checksum the individual message but not the cumulative
251263

252264
try (SeekableByteChannel reader = rf.reader()) {
253265
reader.position(persistedSize);
254266
ByteBuffer buf = copyBuffer.get();
267+
// clear before read, in case an error was thrown before
268+
buf.clear();
255269
while (Buffers.fillFrom(buf, reader) != -1) {
256270
buf.flip();
257271
while (buf.hasRemaining()) {
258272
ChunkSegment[] recoverySegments = chunkSegmenter.segmentBuffer(buf);
259273
for (ChunkSegment segment : recoverySegments) {
260-
WriteObjectRequest writeObjectRequest = processSegment(segment);
274+
WriteObjectRequest writeObjectRequest =
275+
processSegment(segment, updateCumulativeCrc32c);
261276
stream.onNext(writeObjectRequest);
262277
}
263278
}
@@ -280,7 +295,8 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) {
280295
}
281296
}
282297
long newWritten = writeCtx.getTotalSentBytes().get();
283-
Preconditions.checkState(newWritten == goalSize, "%s == %s", newWritten, goalSize);
298+
Preconditions.checkState(
299+
newWritten == goalSize, "newWritten == goalSize (%s == %s)", newWritten, goalSize);
284300
return null;
285301
},
286302
Decoder.identity());

google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static com.google.cloud.storage.TestUtils.apiException;
1920
import static com.google.cloud.storage.TestUtils.assertAll;
2021
import static com.google.cloud.storage.TestUtils.defaultRetryingDeps;
2122
import static com.google.cloud.storage.TestUtils.xxd;
@@ -31,17 +32,23 @@
3132
import com.google.api.gax.rpc.ApiExceptions;
3233
import com.google.api.gax.rpc.UnavailableException;
3334
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
35+
import com.google.cloud.storage.Conversions.Decoder;
3436
import com.google.cloud.storage.Retrying.DefaultRetrier;
3537
import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.Alg;
3638
import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.RequestStream;
3739
import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.ResponseStream;
3840
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
3941
import com.google.cloud.storage.UnifiedOpts.Opts;
42+
import com.google.cloud.storage.it.ChecksummedTestContent;
4043
import com.google.common.base.MoreObjects;
4144
import com.google.common.collect.ImmutableList;
45+
import com.google.common.collect.ImmutableSet;
4246
import com.google.common.primitives.Ints;
4347
import com.google.protobuf.ByteString;
48+
import com.google.protobuf.Message;
49+
import com.google.protobuf.TextFormat;
4450
import com.google.storage.v2.Object;
51+
import com.google.storage.v2.ObjectChecksums;
4552
import com.google.storage.v2.QueryWriteStatusRequest;
4653
import com.google.storage.v2.QueryWriteStatusResponse;
4754
import com.google.storage.v2.StartResumableWriteRequest;
@@ -50,10 +57,12 @@
5057
import com.google.storage.v2.StorageGrpc.StorageImplBase;
5158
import com.google.storage.v2.WriteObjectRequest;
5259
import com.google.storage.v2.WriteObjectResponse;
60+
import com.google.storage.v2.WriteObjectSpec;
5361
import io.grpc.Status;
5462
import io.grpc.Status.Code;
5563
import io.grpc.stub.StreamObserver;
5664
import java.io.IOException;
65+
import java.nio.ByteBuffer;
5766
import java.nio.file.FileVisitResult;
5867
import java.nio.file.Files;
5968
import java.nio.file.Path;
@@ -76,6 +85,7 @@
7685
import java.util.concurrent.Executors;
7786
import java.util.concurrent.Future;
7887
import java.util.concurrent.TimeUnit;
88+
import java.util.concurrent.atomic.AtomicInteger;
7989
import java.util.function.UnaryOperator;
8090
import java.util.stream.Collectors;
8191
import net.jqwik.api.Arbitraries;
@@ -372,6 +382,204 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception {
372382
}
373383
}
374384

385+
@Example
386+
void multipleRetriesAgainstFakeServer() throws Exception {
387+
ChecksummedTestContent content =
388+
ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(17));
389+
390+
String uploadId = UUID.randomUUID().toString();
391+
StartResumableWriteRequest reqStart =
392+
StartResumableWriteRequest.newBuilder()
393+
.setWriteObjectSpec(
394+
WriteObjectSpec.newBuilder()
395+
.setResource(
396+
Object.newBuilder().setBucket("projects/_/buckets/b").setName("o").build())
397+
.build())
398+
.build();
399+
StartResumableWriteResponse resStart =
400+
StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build();
401+
QueryWriteStatusRequest reqQuery =
402+
QueryWriteStatusRequest.newBuilder().setUploadId(uploadId).build();
403+
QueryWriteStatusResponse resQuery =
404+
QueryWriteStatusResponse.newBuilder().setPersistedSize(8).build();
405+
WriteObjectRequest reqWrite0 =
406+
WriteObjectRequest.newBuilder()
407+
.setUploadId(uploadId)
408+
.setWriteOffset(0)
409+
.setChecksummedData(content.slice(0, 2).asChecksummedData())
410+
.build();
411+
WriteObjectRequest reqWrite2 =
412+
WriteObjectRequest.newBuilder()
413+
.setWriteOffset(2)
414+
.setChecksummedData(content.slice(2, 2).asChecksummedData())
415+
.build();
416+
WriteObjectRequest reqWrite4 =
417+
WriteObjectRequest.newBuilder()
418+
.setWriteOffset(4)
419+
.setChecksummedData(content.slice(4, 2).asChecksummedData())
420+
.build();
421+
WriteObjectRequest reqWrite6 =
422+
WriteObjectRequest.newBuilder()
423+
.setWriteOffset(6)
424+
.setChecksummedData(content.slice(6, 2).asChecksummedData())
425+
.build();
426+
WriteObjectRequest reqWrite8 =
427+
WriteObjectRequest.newBuilder()
428+
.setWriteOffset(8)
429+
.setChecksummedData(content.slice(8, 2).asChecksummedData())
430+
.build();
431+
WriteObjectRequest reqWrite8WithUploadId = reqWrite8.toBuilder().setUploadId(uploadId).build();
432+
WriteObjectRequest reqWrite10 =
433+
WriteObjectRequest.newBuilder()
434+
.setWriteOffset(10)
435+
.setChecksummedData(content.slice(10, 2).asChecksummedData())
436+
.build();
437+
WriteObjectRequest reqWrite12 =
438+
WriteObjectRequest.newBuilder()
439+
.setWriteOffset(12)
440+
.setChecksummedData(content.slice(12, 2).asChecksummedData())
441+
.build();
442+
WriteObjectRequest reqWrite14 =
443+
WriteObjectRequest.newBuilder()
444+
.setWriteOffset(14)
445+
.setChecksummedData(content.slice(14, 2).asChecksummedData())
446+
.build();
447+
WriteObjectRequest reqWrite16 =
448+
WriteObjectRequest.newBuilder()
449+
.setWriteOffset(16)
450+
.setChecksummedData(content.slice(16, 1).asChecksummedData())
451+
.build();
452+
WriteObjectRequest reqFinish =
453+
WriteObjectRequest.newBuilder()
454+
.setFinishWrite(true)
455+
.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(content.getCrc32c()).build())
456+
.mergeFrom(reqWrite16)
457+
.build();
458+
WriteObjectResponse resFinish =
459+
WriteObjectResponse.newBuilder()
460+
.setResource(
461+
reqStart.getWriteObjectSpec().getResource().toBuilder()
462+
.setGeneration(1)
463+
.setSize(17)
464+
.setChecksums(
465+
ObjectChecksums.newBuilder()
466+
.setCrc32C(content.getCrc32c())
467+
.setMd5Hash(content.getMd5Bytes())
468+
.build())
469+
.build())
470+
.build();
471+
ImmutableSet<WriteObjectRequest> allReqWrite =
472+
ImmutableSet.of(
473+
reqWrite0,
474+
reqWrite2,
475+
reqWrite4,
476+
reqWrite6,
477+
reqWrite8,
478+
reqWrite10,
479+
reqWrite12,
480+
reqWrite14,
481+
reqWrite16);
482+
483+
AtomicInteger retryCount = new AtomicInteger(0);
484+
StorageImplBase service =
485+
new StorageImplBase() {
486+
@Override
487+
public void startResumableWrite(
488+
StartResumableWriteRequest req, StreamObserver<StartResumableWriteResponse> respond) {
489+
if (req.equals(reqStart)) {
490+
respond.onNext(resStart);
491+
respond.onCompleted();
492+
} else {
493+
unexpected(respond, req);
494+
}
495+
}
496+
497+
@Override
498+
public void queryWriteStatus(
499+
QueryWriteStatusRequest req, StreamObserver<QueryWriteStatusResponse> respond) {
500+
if (req.equals(reqQuery)) {
501+
respond.onNext(resQuery);
502+
respond.onCompleted();
503+
} else {
504+
unexpected(respond, req);
505+
}
506+
}
507+
508+
@Override
509+
public StreamObserver<WriteObjectRequest> writeObject(
510+
StreamObserver<WriteObjectResponse> respond) {
511+
return new StreamObserver<WriteObjectRequest>() {
512+
@Override
513+
public void onNext(WriteObjectRequest value) {
514+
if (value.equals(reqFinish)) {
515+
respond.onNext(resFinish);
516+
respond.onCompleted();
517+
} else if (value.equals(reqWrite10)) {
518+
int i = retryCount.get();
519+
if (i < 2) {
520+
respond.onError(apiException(Code.UNAVAILABLE, "{Unavailable}"));
521+
}
522+
} else if (value.equals(reqWrite8WithUploadId)) {
523+
retryCount.incrementAndGet();
524+
} else if (allReqWrite.contains(value)) {
525+
// do nothing
526+
} else {
527+
unexpected(respond, value);
528+
}
529+
}
530+
531+
@Override
532+
public void onError(Throwable t) {}
533+
534+
@Override
535+
public void onCompleted() {}
536+
};
537+
}
538+
539+
private void unexpected(StreamObserver<?> respond, Message msg) {
540+
respond.onError(
541+
apiException(
542+
Code.UNIMPLEMENTED,
543+
"Unexpected request { " + TextFormat.printer().shortDebugString(msg) + " }"));
544+
}
545+
};
546+
try (FakeServer fakeServer = FakeServer.of(service);
547+
GrpcStorageImpl storage =
548+
(GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) {
549+
550+
BlobInfo info = BlobInfo.newBuilder("b", "o").build();
551+
SettableApiFuture<WriteObjectResponse> resultFuture = SettableApiFuture.create();
552+
BufferHandle recoverBufferHandle = BufferHandle.allocate(2);
553+
SyncAndUploadUnbufferedWritableByteChannel syncAndUpload =
554+
new SyncAndUploadUnbufferedWritableByteChannel(
555+
storage.storageClient.writeObjectCallable(),
556+
storage.storageClient.queryWriteStatusCallable(),
557+
resultFuture,
558+
new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 2, 2),
559+
new DefaultRetrier(UnaryOperator.identity(), storage.getOptions()),
560+
StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler(),
561+
new WriteCtx<>(
562+
new ResumableWrite(
563+
reqStart,
564+
resStart,
565+
id -> reqWrite0.toBuilder().clearWriteObjectSpec().setUploadId(id).build())),
566+
recoveryFileManager.newRecoveryFile(info),
567+
recoverBufferHandle);
568+
try (BufferedWritableByteChannel w =
569+
StorageByteChannels.writable()
570+
.createSynchronized(
571+
new DefaultBufferedWritableByteChannel(recoverBufferHandle, syncAndUpload))) {
572+
w.write(ByteBuffer.wrap(content.getBytes()));
573+
}
574+
575+
Decoder<WriteObjectResponse, BlobInfo> decoder =
576+
Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource);
577+
BlobInfo actual = decoder.decode(resultFuture.get(3, TimeUnit.SECONDS));
578+
assertThat(actual.getSize()).isEqualTo(content.getBytes().length);
579+
assertThat(actual.getCrc32c()).isEqualTo(content.getCrc32cBase64());
580+
}
581+
}
582+
375583
static List<ByteString> dataFrames(long length, int segmentLength) {
376584
// todo: rethink this
377585
Random rand = new Random(length);

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ChecksummedTestContent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ public ChecksummedData asChecksummedData() {
9696
.build();
9797
}
9898

99+
public ChecksummedTestContent slice(int begin, int length) {
100+
return of(bytes, begin, Math.min(length, bytes.length - begin));
101+
}
102+
99103
@Override
100104
public String toString() {
101105
return MoreObjects.toStringHelper(this)

0 commit comments

Comments
 (0)