Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update the accounting of partial batch mutations #2149

Merged
merged 32 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
more PR fixes
  • Loading branch information
ron-gal committed Mar 19, 2024
commit 3386f1d04145cc8c4ac94499b7d6a338fbd7cbae
Expand Up @@ -45,8 +45,8 @@ public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context)
return ApiFutures.transform(
future,
result -> {
if (!result.failedMutations.isEmpty()) {
throw MutateRowsException.create(null, result.failedMutations, result.isRetryable);
if (!result.getFailedMutations().isEmpty()) {
throw MutateRowsException.create(null, result.getFailedMutations(), result.getIsRetryable());
}
return null;
},
Expand Down
Expand Up @@ -326,9 +326,9 @@ private MutateRowsAttemptResult handleAttemptSuccess(List<MutateRowsResponse> re

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
return new MutateRowsAttemptResult(allFailures, isRetryable);
return MutateRowsAttemptResult.create(allFailures, isRetryable);
}
return new MutateRowsAttemptResult();
return MutateRowsAttemptResult.success();
}

/**
Expand Down
Expand Up @@ -16,27 +16,34 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;

/**
* This class represents the result of a MutateRows attempt. It contains the list of failed
* mutations, along with an indicator whether these errors are retryable.
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
*/
@InternalApi
public class MutateRowsAttemptResult {
@AutoValue
public abstract class MutateRowsAttemptResult {

public final boolean isRetryable;
public final List<FailedMutation> failedMutations;
public abstract List<FailedMutation> getFailedMutations();

public MutateRowsAttemptResult() {
this.failedMutations = new ArrayList<>();
this.isRetryable = false;
public abstract boolean getIsRetryable();

@InternalApi
@Nonnull
public static MutateRowsAttemptResult create(
List<FailedMutation> failedMutations, boolean isRetryable) {
return new AutoValue_MutateRowsAttemptResult(failedMutations, isRetryable);
}

public MutateRowsAttemptResult(List<FailedMutation> failedMutations, boolean isRetryable) {
this.failedMutations = failedMutations;
this.isRetryable = isRetryable;
@InternalApi
@Nonnull
public static MutateRowsAttemptResult success() {
return new AutoValue_MutateRowsAttemptResult(new ArrayList<>(), false);
}
}
Expand Up @@ -52,7 +52,7 @@ public void splitResponse(
// mutation. It is important to set the correct error on the correct mutation. When the entry is
// later read, it resolves the exception first, and only later it goes to the value set by
// set().
for (FailedMutation mutation : response.failedMutations) {
for (FailedMutation mutation : response.getFailedMutations()) {
entries.get(mutation.getIndex()).getResultFuture().setException(mutation.getError());
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
for (BatchEntry<RowMutationEntry, Void> batchResponse : entries) {
Expand Down
Expand Up @@ -39,8 +39,8 @@ public MutateRowsPartialErrorRetryAlgorithm(
public boolean shouldRetry(
Throwable previousThrowable, MutateRowsAttemptResult previousResponse) {
// handle partial retryable failures
if (previousResponse != null && !previousResponse.failedMutations.isEmpty()) {
return previousResponse.isRetryable;
if (previousResponse != null && !previousResponse.getFailedMutations().isEmpty()) {
return previousResponse.getIsRetryable();
}
// business as usual
return retryAlgorithm.shouldRetry(previousThrowable, previousResponse);
Expand All @@ -52,8 +52,8 @@ public boolean shouldRetry(
Throwable previousThrowable,
MutateRowsAttemptResult previousResponse) {
// handle partial retryable failures
if (previousResponse != null && !previousResponse.failedMutations.isEmpty()) {
return previousResponse.isRetryable;
if (previousResponse != null && !previousResponse.getFailedMutations().isEmpty()) {
return previousResponse.getIsRetryable();
}
// business as usual
return retryAlgorithm.shouldRetry(context, previousThrowable, previousResponse);
Expand Down
Expand Up @@ -20,15 +20,12 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
Expand All @@ -43,13 +40,11 @@
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
Expand Down Expand Up @@ -453,7 +448,7 @@ public Object answer(InvocationOnMock invocation) {
try (Batcher<RowMutationEntry, Void> batcher =
new BatcherImpl<>(
batchingDescriptor,
stub.internalBulkMutateRowsCallable().withDefaultCallContext(defaultContext),
stub.internalBulkMutateRowsCallable().withDefaultCallContext(defaultContext),
BulkMutation.create(TABLE_ID),
settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(),
Executors.newSingleThreadScheduledExecutor(),
Expand Down
Expand Up @@ -103,8 +103,8 @@ public void singleEntrySuccessTest() throws Exception {
MutateRowsAttemptResult result = parentFuture.attemptFuture.get();

assertThat(result).isNotNull();
assertThat(result.failedMutations).hasSize(0);
assertThat(result.isRetryable).isFalse();
assertThat(result.getFailedMutations()).hasSize(0);
assertThat(result.getIsRetryable()).isFalse();
// innerCallable received the request
assertThat(innerCallable.lastRequest).isEqualTo(request);
}
Expand All @@ -129,8 +129,8 @@ public void missingEntry() throws Exception {

MutateRowsAttemptResult result = parentFuture.attemptFuture.get();

assertThat(result.failedMutations).hasSize(1);
FailedMutation failedMutation = result.failedMutations.get(0);
assertThat(result.getFailedMutations()).hasSize(1);
FailedMutation failedMutation = result.getFailedMutations().get(0);
assertThat(failedMutation.getIndex()).isEqualTo(1);
assertThat(failedMutation.getError())
.hasMessageThat()
Expand Down Expand Up @@ -196,7 +196,7 @@ public void mixedTest() throws Exception {

// Entry expectations
@SuppressWarnings("ConstantConditions")
List<FailedMutation> failedMutations = result.failedMutations;
List<FailedMutation> failedMutations = result.getFailedMutations();
assertThat(failedMutations).hasSize(2);

assertThat(failedMutations.get(0).getIndex()).isEqualTo(1);
Expand Down Expand Up @@ -260,7 +260,7 @@ public void nextAttemptTest() throws Exception {

// Entry expectations
@SuppressWarnings("ConstantConditions")
List<FailedMutation> failedMutations = result.failedMutations;
List<FailedMutation> failedMutations = result.getFailedMutations();
assertThat(failedMutations).hasSize(1);

assertThat(failedMutations.get(0).getIndex()).isEqualTo(2);
Expand Down
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchEntry;
Expand Down Expand Up @@ -96,13 +97,13 @@ public void splitResponseTest() {
assertThat(batchResponse.get(1).getResultFuture().isDone()).isFalse();

MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
underTest.splitResponse(new MutateRowsAttemptResult(), batchResponse);
underTest.splitResponse(MutateRowsAttemptResult.success(), batchResponse);
assertThat(batchResponse.get(0).getResultFuture().isDone()).isTrue();
assertThat(batchResponse.get(1).getResultFuture().isDone()).isTrue();
}

@Test
public void splitResponsePartialErrorsTest() throws Exception {
public void splitResponsePartialErrorsTest() {
BatchEntry<RowMutationEntry, Void> batchEntry1 =
BatchEntry.create(
RowMutationEntry.create("key1").deleteRow(), SettableApiFuture.<Void>create());
Expand All @@ -117,7 +118,7 @@ public void splitResponsePartialErrorsTest() throws Exception {

MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
underTest.splitResponse(
new MutateRowsAttemptResult(
MutateRowsAttemptResult.create(
Arrays.asList(
FailedMutation.create(
0,
Expand All @@ -140,12 +141,9 @@ public void splitResponsePartialErrorsTest() throws Exception {
}
assertThat(unexpectedError).isNull();

Throwable actualError = null;
try {
batchResponse.get(0).getResultFuture().get();
} catch (Throwable t) {
actualError = t.getCause();
}
Throwable actualError =
assertThrows(ExecutionException.class, () -> batchResponse.get(0).getResultFuture().get())
.getCause();

assertThat(actualError).isInstanceOf(InternalException.class);
assertThat(actualError).hasMessageThat().contains("error message");
Expand Down
Expand Up @@ -59,7 +59,7 @@ public void testSuccess() {
MutateRowsErrorConverterUnaryCallable callable =
new MutateRowsErrorConverterUnaryCallable(innerCallable);

innerResult.set(new MutateRowsAttemptResult());
innerResult.set(MutateRowsAttemptResult.success());
callable.call(BulkMutation.create("fake-table"));
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -69,7 +69,7 @@ public void testFailure() {
new MutateRowsErrorConverterUnaryCallable(innerCallable);

innerResult.set(
new MutateRowsAttemptResult(
MutateRowsAttemptResult.create(
Arrays.asList(
FailedMutation.create(
0,
Expand Down