|
16 | 16 |
|
17 | 17 | package com.google.cloud.storage;
|
18 | 18 |
|
| 19 | +import static com.google.cloud.storage.TestUtils.apiException; |
19 | 20 | import static com.google.cloud.storage.TestUtils.assertAll;
|
20 | 21 | import static com.google.cloud.storage.TestUtils.defaultRetryingDeps;
|
21 | 22 | import static com.google.cloud.storage.TestUtils.xxd;
|
|
31 | 32 | import com.google.api.gax.rpc.ApiExceptions;
|
32 | 33 | import com.google.api.gax.rpc.UnavailableException;
|
33 | 34 | import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
|
| 35 | +import com.google.cloud.storage.Conversions.Decoder; |
34 | 36 | import com.google.cloud.storage.Retrying.DefaultRetrier;
|
35 | 37 | import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.Alg;
|
36 | 38 | import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.RequestStream;
|
37 | 39 | import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.ResponseStream;
|
38 | 40 | import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
|
39 | 41 | import com.google.cloud.storage.UnifiedOpts.Opts;
|
| 42 | +import com.google.cloud.storage.it.ChecksummedTestContent; |
40 | 43 | import com.google.common.base.MoreObjects;
|
41 | 44 | import com.google.common.collect.ImmutableList;
|
| 45 | +import com.google.common.collect.ImmutableSet; |
42 | 46 | import com.google.common.primitives.Ints;
|
43 | 47 | import com.google.protobuf.ByteString;
|
| 48 | +import com.google.protobuf.Message; |
| 49 | +import com.google.protobuf.TextFormat; |
44 | 50 | import com.google.storage.v2.Object;
|
| 51 | +import com.google.storage.v2.ObjectChecksums; |
45 | 52 | import com.google.storage.v2.QueryWriteStatusRequest;
|
46 | 53 | import com.google.storage.v2.QueryWriteStatusResponse;
|
47 | 54 | import com.google.storage.v2.StartResumableWriteRequest;
|
|
50 | 57 | import com.google.storage.v2.StorageGrpc.StorageImplBase;
|
51 | 58 | import com.google.storage.v2.WriteObjectRequest;
|
52 | 59 | import com.google.storage.v2.WriteObjectResponse;
|
| 60 | +import com.google.storage.v2.WriteObjectSpec; |
53 | 61 | import io.grpc.Status;
|
54 | 62 | import io.grpc.Status.Code;
|
55 | 63 | import io.grpc.stub.StreamObserver;
|
56 | 64 | import java.io.IOException;
|
| 65 | +import java.nio.ByteBuffer; |
57 | 66 | import java.nio.file.FileVisitResult;
|
58 | 67 | import java.nio.file.Files;
|
59 | 68 | import java.nio.file.Path;
|
|
76 | 85 | import java.util.concurrent.Executors;
|
77 | 86 | import java.util.concurrent.Future;
|
78 | 87 | import java.util.concurrent.TimeUnit;
|
| 88 | +import java.util.concurrent.atomic.AtomicInteger; |
79 | 89 | import java.util.function.UnaryOperator;
|
80 | 90 | import java.util.stream.Collectors;
|
81 | 91 | import net.jqwik.api.Arbitraries;
|
@@ -372,6 +382,204 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception {
|
372 | 382 | }
|
373 | 383 | }
|
374 | 384 |
|
| 385 | + @Example |
| 386 | + void multipleRetriesAgainstFakeServer() throws Exception { |
| 387 | + ChecksummedTestContent content = |
| 388 | + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(17)); |
| 389 | + |
| 390 | + String uploadId = UUID.randomUUID().toString(); |
| 391 | + StartResumableWriteRequest reqStart = |
| 392 | + StartResumableWriteRequest.newBuilder() |
| 393 | + .setWriteObjectSpec( |
| 394 | + WriteObjectSpec.newBuilder() |
| 395 | + .setResource( |
| 396 | + Object.newBuilder().setBucket("projects/_/buckets/b").setName("o").build()) |
| 397 | + .build()) |
| 398 | + .build(); |
| 399 | + StartResumableWriteResponse resStart = |
| 400 | + StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build(); |
| 401 | + QueryWriteStatusRequest reqQuery = |
| 402 | + QueryWriteStatusRequest.newBuilder().setUploadId(uploadId).build(); |
| 403 | + QueryWriteStatusResponse resQuery = |
| 404 | + QueryWriteStatusResponse.newBuilder().setPersistedSize(8).build(); |
| 405 | + WriteObjectRequest reqWrite0 = |
| 406 | + WriteObjectRequest.newBuilder() |
| 407 | + .setUploadId(uploadId) |
| 408 | + .setWriteOffset(0) |
| 409 | + .setChecksummedData(content.slice(0, 2).asChecksummedData()) |
| 410 | + .build(); |
| 411 | + WriteObjectRequest reqWrite2 = |
| 412 | + WriteObjectRequest.newBuilder() |
| 413 | + .setWriteOffset(2) |
| 414 | + .setChecksummedData(content.slice(2, 2).asChecksummedData()) |
| 415 | + .build(); |
| 416 | + WriteObjectRequest reqWrite4 = |
| 417 | + WriteObjectRequest.newBuilder() |
| 418 | + .setWriteOffset(4) |
| 419 | + .setChecksummedData(content.slice(4, 2).asChecksummedData()) |
| 420 | + .build(); |
| 421 | + WriteObjectRequest reqWrite6 = |
| 422 | + WriteObjectRequest.newBuilder() |
| 423 | + .setWriteOffset(6) |
| 424 | + .setChecksummedData(content.slice(6, 2).asChecksummedData()) |
| 425 | + .build(); |
| 426 | + WriteObjectRequest reqWrite8 = |
| 427 | + WriteObjectRequest.newBuilder() |
| 428 | + .setWriteOffset(8) |
| 429 | + .setChecksummedData(content.slice(8, 2).asChecksummedData()) |
| 430 | + .build(); |
| 431 | + WriteObjectRequest reqWrite8WithUploadId = reqWrite8.toBuilder().setUploadId(uploadId).build(); |
| 432 | + WriteObjectRequest reqWrite10 = |
| 433 | + WriteObjectRequest.newBuilder() |
| 434 | + .setWriteOffset(10) |
| 435 | + .setChecksummedData(content.slice(10, 2).asChecksummedData()) |
| 436 | + .build(); |
| 437 | + WriteObjectRequest reqWrite12 = |
| 438 | + WriteObjectRequest.newBuilder() |
| 439 | + .setWriteOffset(12) |
| 440 | + .setChecksummedData(content.slice(12, 2).asChecksummedData()) |
| 441 | + .build(); |
| 442 | + WriteObjectRequest reqWrite14 = |
| 443 | + WriteObjectRequest.newBuilder() |
| 444 | + .setWriteOffset(14) |
| 445 | + .setChecksummedData(content.slice(14, 2).asChecksummedData()) |
| 446 | + .build(); |
| 447 | + WriteObjectRequest reqWrite16 = |
| 448 | + WriteObjectRequest.newBuilder() |
| 449 | + .setWriteOffset(16) |
| 450 | + .setChecksummedData(content.slice(16, 1).asChecksummedData()) |
| 451 | + .build(); |
| 452 | + WriteObjectRequest reqFinish = |
| 453 | + WriteObjectRequest.newBuilder() |
| 454 | + .setFinishWrite(true) |
| 455 | + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(content.getCrc32c()).build()) |
| 456 | + .mergeFrom(reqWrite16) |
| 457 | + .build(); |
| 458 | + WriteObjectResponse resFinish = |
| 459 | + WriteObjectResponse.newBuilder() |
| 460 | + .setResource( |
| 461 | + reqStart.getWriteObjectSpec().getResource().toBuilder() |
| 462 | + .setGeneration(1) |
| 463 | + .setSize(17) |
| 464 | + .setChecksums( |
| 465 | + ObjectChecksums.newBuilder() |
| 466 | + .setCrc32C(content.getCrc32c()) |
| 467 | + .setMd5Hash(content.getMd5Bytes()) |
| 468 | + .build()) |
| 469 | + .build()) |
| 470 | + .build(); |
| 471 | + ImmutableSet<WriteObjectRequest> allReqWrite = |
| 472 | + ImmutableSet.of( |
| 473 | + reqWrite0, |
| 474 | + reqWrite2, |
| 475 | + reqWrite4, |
| 476 | + reqWrite6, |
| 477 | + reqWrite8, |
| 478 | + reqWrite10, |
| 479 | + reqWrite12, |
| 480 | + reqWrite14, |
| 481 | + reqWrite16); |
| 482 | + |
| 483 | + AtomicInteger retryCount = new AtomicInteger(0); |
| 484 | + StorageImplBase service = |
| 485 | + new StorageImplBase() { |
| 486 | + @Override |
| 487 | + public void startResumableWrite( |
| 488 | + StartResumableWriteRequest req, StreamObserver<StartResumableWriteResponse> respond) { |
| 489 | + if (req.equals(reqStart)) { |
| 490 | + respond.onNext(resStart); |
| 491 | + respond.onCompleted(); |
| 492 | + } else { |
| 493 | + unexpected(respond, req); |
| 494 | + } |
| 495 | + } |
| 496 | + |
| 497 | + @Override |
| 498 | + public void queryWriteStatus( |
| 499 | + QueryWriteStatusRequest req, StreamObserver<QueryWriteStatusResponse> respond) { |
| 500 | + if (req.equals(reqQuery)) { |
| 501 | + respond.onNext(resQuery); |
| 502 | + respond.onCompleted(); |
| 503 | + } else { |
| 504 | + unexpected(respond, req); |
| 505 | + } |
| 506 | + } |
| 507 | + |
| 508 | + @Override |
| 509 | + public StreamObserver<WriteObjectRequest> writeObject( |
| 510 | + StreamObserver<WriteObjectResponse> respond) { |
| 511 | + return new StreamObserver<WriteObjectRequest>() { |
| 512 | + @Override |
| 513 | + public void onNext(WriteObjectRequest value) { |
| 514 | + if (value.equals(reqFinish)) { |
| 515 | + respond.onNext(resFinish); |
| 516 | + respond.onCompleted(); |
| 517 | + } else if (value.equals(reqWrite10)) { |
| 518 | + int i = retryCount.get(); |
| 519 | + if (i < 2) { |
| 520 | + respond.onError(apiException(Code.UNAVAILABLE, "{Unavailable}")); |
| 521 | + } |
| 522 | + } else if (value.equals(reqWrite8WithUploadId)) { |
| 523 | + retryCount.incrementAndGet(); |
| 524 | + } else if (allReqWrite.contains(value)) { |
| 525 | + // do nothing |
| 526 | + } else { |
| 527 | + unexpected(respond, value); |
| 528 | + } |
| 529 | + } |
| 530 | + |
| 531 | + @Override |
| 532 | + public void onError(Throwable t) {} |
| 533 | + |
| 534 | + @Override |
| 535 | + public void onCompleted() {} |
| 536 | + }; |
| 537 | + } |
| 538 | + |
| 539 | + private void unexpected(StreamObserver<?> respond, Message msg) { |
| 540 | + respond.onError( |
| 541 | + apiException( |
| 542 | + Code.UNIMPLEMENTED, |
| 543 | + "Unexpected request { " + TextFormat.printer().shortDebugString(msg) + " }")); |
| 544 | + } |
| 545 | + }; |
| 546 | + try (FakeServer fakeServer = FakeServer.of(service); |
| 547 | + GrpcStorageImpl storage = |
| 548 | + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { |
| 549 | + |
| 550 | + BlobInfo info = BlobInfo.newBuilder("b", "o").build(); |
| 551 | + SettableApiFuture<WriteObjectResponse> resultFuture = SettableApiFuture.create(); |
| 552 | + BufferHandle recoverBufferHandle = BufferHandle.allocate(2); |
| 553 | + SyncAndUploadUnbufferedWritableByteChannel syncAndUpload = |
| 554 | + new SyncAndUploadUnbufferedWritableByteChannel( |
| 555 | + storage.storageClient.writeObjectCallable(), |
| 556 | + storage.storageClient.queryWriteStatusCallable(), |
| 557 | + resultFuture, |
| 558 | + new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 2, 2), |
| 559 | + new DefaultRetrier(UnaryOperator.identity(), storage.getOptions()), |
| 560 | + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler(), |
| 561 | + new WriteCtx<>( |
| 562 | + new ResumableWrite( |
| 563 | + reqStart, |
| 564 | + resStart, |
| 565 | + id -> reqWrite0.toBuilder().clearWriteObjectSpec().setUploadId(id).build())), |
| 566 | + recoveryFileManager.newRecoveryFile(info), |
| 567 | + recoverBufferHandle); |
| 568 | + try (BufferedWritableByteChannel w = |
| 569 | + StorageByteChannels.writable() |
| 570 | + .createSynchronized( |
| 571 | + new DefaultBufferedWritableByteChannel(recoverBufferHandle, syncAndUpload))) { |
| 572 | + w.write(ByteBuffer.wrap(content.getBytes())); |
| 573 | + } |
| 574 | + |
| 575 | + Decoder<WriteObjectResponse, BlobInfo> decoder = |
| 576 | + Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource); |
| 577 | + BlobInfo actual = decoder.decode(resultFuture.get(3, TimeUnit.SECONDS)); |
| 578 | + assertThat(actual.getSize()).isEqualTo(content.getBytes().length); |
| 579 | + assertThat(actual.getCrc32c()).isEqualTo(content.getCrc32cBase64()); |
| 580 | + } |
| 581 | + } |
| 582 | + |
375 | 583 | static List<ByteString> dataFrames(long length, int segmentLength) {
|
376 | 584 | // todo: rethink this
|
377 | 585 | Random rand = new Random(length);
|
|
0 commit comments