Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update the accounting of partial batch mutations #2149

Merged
merged 32 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
Minor changes
  • Loading branch information
ron-gal committed Mar 6, 2024
commit aec1125cfd79a200e23042ddfb5cc65b7eddf6e1
Expand Up @@ -25,7 +25,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptErrors;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.util.concurrent.MoreExecutors;

/**
Expand All @@ -49,26 +49,3 @@ public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context)
}
}

class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<MutateRowsRequest, Void> {
private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptErrors> innerCallable;

MutateRowsErrorConverterUnaryCallable(
UnaryCallable<MutateRowsRequest, MutateRowsAttemptErrors> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext context) {
ApiFuture<MutateRowsAttemptErrors> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future,
(ApiFunction<MutateRowsAttemptErrors, Void>)
result -> {
if (result != null) {
throw MutateRowsException.create(null, result.failedMutations, result.isRetryable);
}
return null;
},
MoreExecutors.directExecutor());
}
}
Expand Up @@ -102,7 +102,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptErrors;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
Expand Down Expand Up @@ -708,7 +708,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

BasicResultRetryAlgorithm<MutateRowsAttemptErrors> resultRetryAlgorithm;
BasicResultRetryAlgorithm<MutateRowsAttemptResult> resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
Expand All @@ -717,13 +717,13 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
MutateRowsErrorRetryAlgorithm mutateRowsErrorRetryAlgorithm =
new MutateRowsErrorRetryAlgorithm(resultRetryAlgorithm);

RetryAlgorithm<MutateRowsAttemptErrors> retryAlgorithm =
RetryAlgorithm<MutateRowsAttemptResult> retryAlgorithm =
new RetryAlgorithm<>(
mutateRowsErrorRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));

RetryingExecutorWithContext<MutateRowsAttemptErrors> retryingExecutor =
RetryingExecutorWithContext<MutateRowsAttemptResult> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
MutateRowsRetryingCallable mutateRowsRetryingCallable =
new MutateRowsRetryingCallable(
Expand Down
@@ -0,0 +1,34 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.util.concurrent.MoreExecutors;

public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<MutateRowsRequest, Void> {

private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable;

MutateRowsErrorConverterUnaryCallable(
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext context) {
ApiFuture<MutateRowsAttemptResult> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future,
result -> {
if (!result.failedMutations.isEmpty()) {
throw MutateRowsException.create(null, result.failedMutations, result.isRetryable);
}
return null;
},
MoreExecutors.directExecutor());
}
}
Expand Up @@ -87,7 +87,7 @@
*
* <p>Package-private for internal use.
*/
class MutateRowsAttemptCallable implements Callable<MutateRowsAttemptErrors> {
class MutateRowsAttemptCallable implements Callable<MutateRowsAttemptResult> {
// Synthetic status for Mutations that didn't get a result (because the whole RPC failed). It will
// be exposed in MutateRowsException's FailedMutations.
private static final StatusCode LOCAL_UNKNOWN_STATUS =
Expand Down Expand Up @@ -116,10 +116,10 @@ public Object getTransportCode() {
@Nonnull private TimedAttemptSettings attemptSettings;

// Parent controller
private RetryingFuture<MutateRowsAttemptErrors> externalFuture;
private RetryingFuture<MutateRowsAttemptResult> externalFuture;

// Simple wrappers for handling result futures
private final ApiFunction<List<MutateRowsResponse>, MutateRowsAttemptErrors>
private final ApiFunction<List<MutateRowsResponse>, MutateRowsAttemptResult>
attemptSuccessfulCallback =
responses -> {
handleAttemptSuccess(responses);
Expand Down Expand Up @@ -148,7 +148,7 @@ public Object getTransportCode() {
permanentFailures = Lists.newArrayList();
}

public void setExternalFuture(RetryingFuture<MutateRowsAttemptErrors> externalFuture) {
public void setExternalFuture(RetryingFuture<MutateRowsAttemptResult> externalFuture) {
this.externalFuture = externalFuture;
}

Expand All @@ -161,7 +161,7 @@ public void setExternalFuture(RetryingFuture<MutateRowsAttemptErrors> externalFu
* return of this method should just be ignored.
*/
@Override
public MutateRowsAttemptErrors call() {
public MutateRowsAttemptResult call() {
try {
// externalFuture is set from MutateRowsRetryingCallable before invoking this method. It
// shouldn't be null unless the code changed
Expand Down Expand Up @@ -203,7 +203,7 @@ public MutateRowsAttemptErrors call() {

// Inspect the results and either propagate the success, or prepare to retry the failed
// mutations
ApiFuture<MutateRowsAttemptErrors> transformed =
ApiFuture<MutateRowsAttemptResult> transformed =
ApiFutures.transform(catching, attemptSuccessfulCallback, MoreExecutors.directExecutor());

// Notify the parent of the attempt
Expand Down Expand Up @@ -262,7 +262,7 @@ private void handleAttemptError(Throwable rpcError) {
* {@link MutateRowsException}. If no errors exist, then the attempt future is successfully
* completed. We don't currently handle RetryInfo on entry level failures.
*/
private MutateRowsAttemptErrors handleAttemptSuccess(List<MutateRowsResponse> responses) {
private MutateRowsAttemptResult handleAttemptSuccess(List<MutateRowsResponse> responses) {
List<FailedMutation> allFailures = Lists.newArrayList(permanentFailures);
MutateRowsRequest lastRequest = currentRequest;

Expand Down Expand Up @@ -321,9 +321,9 @@ private MutateRowsAttemptErrors handleAttemptSuccess(List<MutateRowsResponse> re

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
return new MutateRowsAttemptErrors(allFailures, isRetryable);
return new MutateRowsAttemptResult(allFailures, isRetryable);
}
return null;
return new MutateRowsAttemptResult();
}

/**
Expand Down
Expand Up @@ -16,14 +16,18 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import java.util.ArrayList;
import java.util.List;

public class MutateRowsAttemptErrors {
public class MutateRowsAttemptResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mark this as @InternalApi and add javadoc explaining the purpose of this class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I would highly recommend to use AutoValue for value classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about AutoValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public boolean isRetryable;
public List<FailedMutation> failedMutations;

public MutateRowsAttemptErrors(List<FailedMutation> failedMutations, boolean isRetryable) {
public MutateRowsAttemptResult(){
this.failedMutations = new ArrayList<>();
this.isRetryable = false;
}
public MutateRowsAttemptResult(List<FailedMutation> failedMutations, boolean isRetryable) {
this.failedMutations = failedMutations;
this.isRetryable = isRetryable;
}
Expand Down
Expand Up @@ -42,17 +42,17 @@
*/
@InternalApi
public class MutateRowsRetryingCallable
extends UnaryCallable<MutateRowsRequest, MutateRowsAttemptErrors> {
extends UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> {
private final ApiCallContext callContextPrototype;
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutorWithContext<MutateRowsAttemptErrors> executor;
private final RetryingExecutorWithContext<MutateRowsAttemptResult> executor;
private final ImmutableSet<Code> retryCodes;
private final RetryAlgorithm retryAlgorithm;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutorWithContext<MutateRowsAttemptErrors> executor,
@Nonnull RetryingExecutorWithContext<MutateRowsAttemptResult> executor,
@Nonnull Set<StatusCode.Code> retryCodes,
@Nonnull RetryAlgorithm retryAlgorithm) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
Expand All @@ -63,20 +63,15 @@ public MutateRowsRetryingCallable(
}

@Override
public RetryingFuture<MutateRowsAttemptErrors> futureCall(
public RetryingFuture<MutateRowsAttemptResult> futureCall(
MutateRowsRequest request, ApiCallContext inputContext) {
ApiCallContext context = callContextPrototype.nullToSelf(inputContext);
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes, retryAlgorithm);

RetryingFuture<MutateRowsAttemptErrors> retryingFuture =
RetryingFuture<MutateRowsAttemptResult> retryingFuture =
executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
MutateRowsAttemptErrors mutateRowsAttemptErrors = retryCallable.call();
if (mutateRowsAttemptErrors != null) {
throw MutateRowsException.create(
null, mutateRowsAttemptErrors.failedMutations, mutateRowsAttemptErrors.isRetryable);
}

return retryingFuture;
}
Expand Down
Expand Up @@ -19,23 +19,23 @@
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptErrors;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import org.checkerframework.checker.nullness.qual.Nullable;

@InternalApi
public class MutateRowsErrorRetryAlgorithm
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
extends BasicResultRetryAlgorithm<MutateRowsAttemptErrors> {
BasicResultRetryAlgorithm<MutateRowsAttemptErrors> retryAlgorithm;
extends BasicResultRetryAlgorithm<MutateRowsAttemptResult> {
BasicResultRetryAlgorithm<MutateRowsAttemptResult> retryAlgorithm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private & final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public MutateRowsErrorRetryAlgorithm(
BasicResultRetryAlgorithm<MutateRowsAttemptErrors> retryAlgorithm) {
BasicResultRetryAlgorithm<MutateRowsAttemptResult> retryAlgorithm) {
this.retryAlgorithm = retryAlgorithm;
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable previousThrowable,
MutateRowsAttemptErrors previousResponse,
MutateRowsAttemptResult previousResponse,
TimedAttemptSettings previousSettings) {
return retryAlgorithm.createNextAttempt(previousThrowable, previousResponse, previousSettings);
}
Expand All @@ -44,15 +44,15 @@ public TimedAttemptSettings createNextAttempt(
public TimedAttemptSettings createNextAttempt(
RetryingContext context,
Throwable previousThrowable,
MutateRowsAttemptErrors previousResponse,
MutateRowsAttemptResult previousResponse,
TimedAttemptSettings previousSettings) {
return retryAlgorithm.createNextAttempt(
context, previousThrowable, previousResponse, previousSettings);
}

@Override
public boolean shouldRetry(
Throwable previousThrowable, MutateRowsAttemptErrors previousResponse) {
Throwable previousThrowable, MutateRowsAttemptResult previousResponse) {
if (retryAlgorithm.shouldRetry(previousThrowable, previousResponse)) {
return true;
}
Expand All @@ -63,7 +63,7 @@ public boolean shouldRetry(
public boolean shouldRetry(
@Nullable RetryingContext context,
Throwable previousThrowable,
MutateRowsAttemptErrors previousResponse) {
MutateRowsAttemptResult previousResponse) {
if (retryAlgorithm.shouldRetry(context, previousThrowable, previousResponse)) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
Expand Down
Expand Up @@ -411,9 +411,9 @@ public ApiFuture<List<MutateRowsResponse>> futureCall(
}
}

static class MockRetryingFuture extends AbstractApiFuture<MutateRowsAttemptErrors>
implements RetryingFuture<MutateRowsAttemptErrors> {
ApiFuture<MutateRowsAttemptErrors> attemptFuture;
static class MockRetryingFuture extends AbstractApiFuture<MutateRowsAttemptResult>
implements RetryingFuture<MutateRowsAttemptResult> {
ApiFuture<MutateRowsAttemptResult> attemptFuture;

TimedAttemptSettings timedAttemptSettings;

Expand All @@ -434,7 +434,7 @@ static class MockRetryingFuture extends AbstractApiFuture<MutateRowsAttemptError
}

@Override
public void setAttemptFuture(ApiFuture<MutateRowsAttemptErrors> attemptFuture) {
public void setAttemptFuture(ApiFuture<MutateRowsAttemptResult> attemptFuture) {
this.attemptFuture = attemptFuture;
}

Expand All @@ -444,17 +444,17 @@ public TimedAttemptSettings getAttemptSettings() {
}

@Override
public Callable<MutateRowsAttemptErrors> getCallable() {
public Callable<MutateRowsAttemptResult> getCallable() {
throw new UnsupportedOperationException("not used");
}

@Override
public ApiFuture<MutateRowsAttemptErrors> peekAttemptResult() {
public ApiFuture<MutateRowsAttemptResult> peekAttemptResult() {
throw new UnsupportedOperationException("not used");
}

@Override
public ApiFuture<MutateRowsAttemptErrors> getAttemptResult() {
public ApiFuture<MutateRowsAttemptResult> getAttemptResult() {
throw new UnsupportedOperationException("not used");
}
}
Expand Down