Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Custom Part Metadata Decorator to ParallelCompositeUploadConfig #2434

Merged
merged 24 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
removing unnecessary tag
  • Loading branch information
sydney-munro committed Mar 15, 2024
commit 8b64aa67d7772f448f6943040ed3b2f279f6b4f6
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.security.SecureRandom;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -126,21 +127,21 @@ public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWri
private final BufferAllocationStrategy bufferAllocationStrategy;
private final PartNamingStrategy partNamingStrategy;
private final PartCleanupStrategy partCleanupStrategy;
private final PartCustomTimeStrategy partCustomTimeStrategy;
private final PartMetadataFieldSupplier partMetadataFieldSupplier;

private ParallelCompositeUploadBlobWriteSessionConfig(
int maxPartsPerCompose,
ExecutorSupplier executorSupplier,
BufferAllocationStrategy bufferAllocationStrategy,
PartNamingStrategy partNamingStrategy,
PartCleanupStrategy partCleanupStrategy,
PartCustomTimeStrategy partCustomTimeStrategy) {
PartMetadataFieldSupplier partMetadataFieldSupplier) {
this.maxPartsPerCompose = maxPartsPerCompose;
this.executorSupplier = executorSupplier;
this.bufferAllocationStrategy = bufferAllocationStrategy;
this.partNamingStrategy = partNamingStrategy;
this.partCleanupStrategy = partCleanupStrategy;
this.partCustomTimeStrategy = partCustomTimeStrategy;
this.partMetadataFieldSupplier = partMetadataFieldSupplier;
}

@InternalApi
Expand All @@ -155,7 +156,7 @@ ParallelCompositeUploadBlobWriteSessionConfig withMaxPartsPerCompose(int maxPart
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy,
partCustomTimeStrategy);
partMetadataFieldSupplier);
}

/**
Expand All @@ -176,7 +177,7 @@ public ParallelCompositeUploadBlobWriteSessionConfig withExecutorSupplier(
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy,
partCustomTimeStrategy);
partMetadataFieldSupplier);
}

/**
Expand All @@ -198,7 +199,7 @@ public ParallelCompositeUploadBlobWriteSessionConfig withBufferAllocationStrateg
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy,
partCustomTimeStrategy);
partMetadataFieldSupplier);
}

/**
Expand All @@ -219,7 +220,7 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartNamingStrategy(
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy,
partCustomTimeStrategy);
partMetadataFieldSupplier);
}

/**
Expand All @@ -240,20 +241,20 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartCleanupStrategy(
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy,
partCustomTimeStrategy);
partMetadataFieldSupplier);
}

@BetaApi
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
public ParallelCompositeUploadBlobWriteSessionConfig withPartCustomTimeStrategy(
PartCustomTimeStrategy partCustomTimeStrategy) {
checkNotNull(partCustomTimeStrategy, "partCustomTimeStrategy must be non null");
public ParallelCompositeUploadBlobWriteSessionConfig withPartMetadataFieldSupplier(
PartMetadataFieldSupplier partMetadataFieldSupplier) {
checkNotNull(partMetadataFieldSupplier, "partMetadataFieldSupplier must be non null");
return new ParallelCompositeUploadBlobWriteSessionConfig(
maxPartsPerCompose,
executorSupplier,
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy,
partCustomTimeStrategy);
partMetadataFieldSupplier);
}

@BetaApi
Expand All @@ -264,7 +265,7 @@ static ParallelCompositeUploadBlobWriteSessionConfig withDefaults() {
BufferAllocationStrategy.simple(ByteSizeConstants._16MiB),
PartNamingStrategy.noPrefix(),
PartCleanupStrategy.always(),
PartCustomTimeStrategy.noCustomTime());
PartMetadataFieldSupplier.noOp());
}

@InternalApi
Expand Down Expand Up @@ -658,54 +659,41 @@ protected String fmtFields(String randomKey, String ultimateObjectName, String p
* CustomTime Metadata Field. This will be a time set a duration in the future which will serve to
* aid in part cleanup via OLM Rules.
*
* @see #withPartCustomTimeStrategy(PartCustomTimeStrategy)
* @since 2.35.1</> This new api is in preview and is subject to breaking changes.
* @see #withPartMetadataFieldSupplier(PartMetadataFieldSupplier)
* @since 2.35.1 This new api is in preview and is subject to breaking changes.
sydney-munro marked this conversation as resolved.
Show resolved Hide resolved
*/
@BetaApi
@Immutable
public abstract static class PartCustomTimeStrategy implements Serializable {
private final boolean isSetCustomTime;
public abstract static class PartMetadataFieldSupplier implements Serializable {
sydney-munro marked this conversation as resolved.
Show resolved Hide resolved

private PartCustomTimeStrategy(boolean isSetCustomTime) {
this.isSetCustomTime = isSetCustomTime;
}

abstract Duration getTimeInFuture();

public boolean isSetCustomTime() {
return isSetCustomTime;
}
public abstract BlobInfo.Builder modifyPartFields(BlobInfo.Builder builder);
sydney-munro marked this conversation as resolved.
Show resolved Hide resolved

@BetaApi
public static CustomTimeSet setCustomTime(Duration timeInFuture) {
return new CustomTimeSet(timeInFuture);
public static CustomTimeInFuture setCustomTimeInFuture(Duration timeInFuture) {
return new CustomTimeInFuture(timeInFuture);
}

@BetaApi
public static NoCustomTime noCustomTime() {
return new NoCustomTime();
public static NoOp noOp() {
return new NoOp();
}

static final class CustomTimeSet extends PartCustomTimeStrategy {
private final Duration timeInFuture;
static final class CustomTimeInFuture extends PartMetadataFieldSupplier {
private Duration timeInFuture;

Duration getTimeInFuture() {
return timeInFuture;
}

CustomTimeSet(Duration timeInFuture) {
super(true);
CustomTimeInFuture(Duration timeInFuture) {
this.timeInFuture = timeInFuture;
sydney-munro marked this conversation as resolved.
Show resolved Hide resolved
}
}

static final class NoCustomTime extends PartCustomTimeStrategy {
NoCustomTime() {
super(false);
public BlobInfo.Builder modifyPartFields(BlobInfo.Builder builder) {
OffsetDateTime futureTime = OffsetDateTime.now().plus(timeInFuture);
return builder.setCustomTimeOffsetDateTime(futureTime);
}
}

Duration getTimeInFuture() {
throw new IllegalStateException("There is no time in future for NoCustomTime Strategy");
static final class NoOp extends PartMetadataFieldSupplier {
public BlobInfo.Builder modifyPartFields(BlobInfo.Builder builder) {
return builder;
}
}
}
Expand Down Expand Up @@ -839,7 +827,7 @@ public ApiFuture<BufferedWritableByteChannel> openAsync() {
partNamingStrategy,
partCleanupStrategy,
maxPartsPerCompose,
partCustomTimeStrategy,
partMetadataFieldSupplier,
result,
storageInternal,
info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.MetadataField.PartRange;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCustomTimeStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartMetadataFieldSupplier;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage.ComposeRequest;
import com.google.cloud.storage.UnifiedOpts.Crc32cMatch;
Expand All @@ -57,8 +57,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -114,7 +112,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
private final PartNamingStrategy partNamingStrategy;
private final PartCleanupStrategy partCleanupStrategy;
private final int maxElementsPerCompact;
private final PartCustomTimeStrategy partCustomTimeStrategy;
private final PartMetadataFieldSupplier partMetadataFieldSupplier;
private final SettableApiFuture<BlobInfo> finalObject;
private final StorageInternal storage;
private final BlobInfo ultimateObject;
Expand All @@ -139,7 +137,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
PartNamingStrategy partNamingStrategy,
PartCleanupStrategy partCleanupStrategy,
int maxElementsPerCompact,
PartCustomTimeStrategy partCustomTimeStrategy,
PartMetadataFieldSupplier partMetadataFieldSupplier,
SettableApiFuture<BlobInfo> finalObject,
StorageInternal storage,
BlobInfo ultimateObject,
Expand All @@ -149,7 +147,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
this.partNamingStrategy = partNamingStrategy;
this.partCleanupStrategy = partCleanupStrategy;
this.maxElementsPerCompact = maxElementsPerCompact;
this.partCustomTimeStrategy = partCustomTimeStrategy;
this.partMetadataFieldSupplier = partMetadataFieldSupplier;
this.finalObject = finalObject;
this.storage = storage;
this.ultimateObject = ultimateObject;
Expand Down Expand Up @@ -433,11 +431,7 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o
PART_INDEX.appendTo(partRange, builder);
OBJECT_OFFSET.appendTo(offset, builder);
b.setMetadata(builder.build());
if (partCustomTimeStrategy.isSetCustomTime()) {
Duration timeInFuture = partCustomTimeStrategy.getTimeInFuture();
OffsetDateTime now = OffsetDateTime.now();
b.setCustomTimeOffsetDateTime(now.plus(timeInFuture));
}
b = partMetadataFieldSupplier.modifyPartFields(b);
return b.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import static com.google.common.truth.Truth.assertWithMessage;

import com.google.cloud.storage.MetadataField.PartRange;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCustomTimeStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.common.truth.StringSubject;
import java.time.Duration;
import org.junit.Test;

public final class ParallelCompositeUploadBlobWriteSessionConfigTest {
Expand Down Expand Up @@ -89,21 +87,6 @@ public void partNameStrategy_objectNamePrefix() throws Exception {
() -> assertThat(fmt).startsWith("a/b/obj"));
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
}

@Test(expected = IllegalStateException.class)
public void partCustomTimeStrategy_noCustomTime() {
PartCustomTimeStrategy strategy = PartCustomTimeStrategy.noCustomTime();
assertThat(strategy.isSetCustomTime()).isFalse();
strategy.getTimeInFuture();
}

@Test
public void partCustomTimeStrategy_customTimeSet() {
Duration timeInFuture = Duration.ofSeconds(30);
PartCustomTimeStrategy strategy = PartCustomTimeStrategy.setCustomTime(timeInFuture);
assertThat(strategy.isSetCustomTime()).isTrue();
assertThat(strategy.getTimeInFuture()).isEqualTo(timeInFuture);
}

private static StringSubject assertField(String fmt, int idx) {
String[] split = fmt.split(";");
String s = split[idx];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.MetadataField.PartRange;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCustomTimeStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartMetadataFieldSupplier;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.ParallelCompositeUploadWritableByteChannel.BufferHandleReleaser;
import com.google.cloud.storage.Storage.ComposeRequest;
Expand Down Expand Up @@ -81,7 +81,7 @@ public final class ParallelCompositeUploadWritableByteChannelTest {
private SettableApiFuture<BlobInfo> finalObject;
private FakeStorageInternal storageInternal;
private SimplisticPartNamingStrategy partNamingStrategy;
private PartCustomTimeStrategy partCustomTimeStrategy;
private PartMetadataFieldSupplier partMetadataFieldSupplier;
private int bufferCapacity;

@Before
Expand All @@ -93,7 +93,7 @@ public void setUp() throws Exception {
finalObject = SettableApiFuture.create();
partNamingStrategy = new SimplisticPartNamingStrategy("prefix");
storageInternal = new FakeStorageInternal();
partCustomTimeStrategy = PartCustomTimeStrategy.noCustomTime();
partMetadataFieldSupplier = PartMetadataFieldSupplier.noOp();
}

@Test
Expand Down Expand Up @@ -205,7 +205,7 @@ public void cleanup_success_disabled() throws Exception {
partNamingStrategy,
PartCleanupStrategy.never(),
maxElementsPerCompact,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
storageInternal,
info,
Expand Down Expand Up @@ -245,7 +245,7 @@ public void writeDoesNotFlushIfItIsnNotFull() throws Exception {
partNamingStrategy,
PartCleanupStrategy.never(),
maxElementsPerCompact,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
storageInternal,
info,
Expand Down Expand Up @@ -345,7 +345,7 @@ public void partsRetainMetadata() throws Exception {
partNamingStrategy,
PartCleanupStrategy.never(),
3,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
new FakeStorageInternal() {
@Override
Expand Down Expand Up @@ -435,7 +435,7 @@ public void creatingAnEmptyObjectWhichFailsIsSetAsResultFailureAndThrowFromClose
partNamingStrategy,
PartCleanupStrategy.always(),
3,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
new FakeStorageInternal() {
@Override
Expand Down Expand Up @@ -469,7 +469,7 @@ public void badServerCrc32cResultsInException() throws Exception {
partNamingStrategy,
PartCleanupStrategy.always(),
3,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
new FakeStorageInternal() {
@Override
Expand Down Expand Up @@ -576,7 +576,7 @@ public BlobInfo internalDirectUpload(
partNamingStrategy,
PartCleanupStrategy.never(),
32,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
storageInternal,
info,
Expand Down Expand Up @@ -656,7 +656,7 @@ public void errorContextIsPopulated() throws Exception {
partNamingStrategy,
PartCleanupStrategy.never(),
3,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
new FakeStorageInternal() {
@Override
Expand Down Expand Up @@ -739,7 +739,7 @@ public BlobInfo internalObjectGet(BlobId blobId, Opts<ObjectSourceOpt> opts) {
partNamingStrategy,
PartCleanupStrategy.always(),
10,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
storageInternal,
info,
Expand Down Expand Up @@ -768,7 +768,7 @@ private ParallelCompositeUploadWritableByteChannel defaultPcu(int maxElementsPer
partNamingStrategy,
PartCleanupStrategy.always(),
maxElementsPerCompact,
partCustomTimeStrategy,
partMetadataFieldSupplier,
finalObject,
storageInternal,
info,
Expand Down