Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update the accounting of partial batch mutations #2149

Merged
merged 32 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Next Next commit
Fixed retrying of partial errors
  • Loading branch information
ron-gal committed Mar 6, 2024
commit 7de3aba70aed2dc5ac224b6049801de2314999b9
Expand Up @@ -17,10 +17,16 @@

import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptErrors;
import com.google.common.util.concurrent.MoreExecutors;

/**
* The cookie holder will act as operation scoped storage for all retry attempts. Each attempt's
Expand All @@ -42,3 +48,27 @@ public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context)
grpcCallContext.getCallOptions().withOption(COOKIES_HOLDER_KEY, new CookiesHolder())));
}
}

class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<MutateRowsRequest, Void> {
private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptErrors> innerCallable;

MutateRowsErrorConverterUnaryCallable(
UnaryCallable<MutateRowsRequest, MutateRowsAttemptErrors> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext context) {
ApiFuture<MutateRowsAttemptErrors> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future,
(ApiFunction<MutateRowsAttemptErrors, Void>)
result -> {
if (result != null) {
throw MutateRowsException.create(null, result.failedMutations, result.isRetryable);
}
return null;
},
MoreExecutors.directExecutor());
}
}
Expand Up @@ -102,6 +102,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptErrors;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
Expand All @@ -112,6 +113,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.MutateRowsErrorRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -706,28 +708,34 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

BasicResultRetryAlgorithm<Void> resultRetryAlgorithm;
BasicResultRetryAlgorithm<MutateRowsAttemptErrors> resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
}
MutateRowsErrorRetryAlgorithm mutateRowsErrorRetryAlgorithm =
new MutateRowsErrorRetryAlgorithm(resultRetryAlgorithm);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

RetryAlgorithm<Void> retryAlgorithm =
RetryAlgorithm<MutateRowsAttemptErrors> retryAlgorithm =
new RetryAlgorithm<>(
resultRetryAlgorithm,
mutateRowsErrorRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));

RetryingExecutorWithContext<Void> retryingExecutor =
RetryingExecutorWithContext<MutateRowsAttemptErrors> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);
MutateRowsRetryingCallable mutateRowsRetryingCallable =
new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);

UnaryCallable<MutateRowsRequest, Void> errorsConverter =
new MutateRowsErrorConverterUnaryCallable(mutateRowsRetryingCallable);
return errorsConverter;
}

/**
Expand Down
Expand Up @@ -87,7 +87,7 @@
*
* <p>Package-private for internal use.
*/
class MutateRowsAttemptCallable implements Callable<Void> {
class MutateRowsAttemptCallable implements Callable<MutateRowsAttemptErrors> {
// Synthetic status for Mutations that didn't get a result (because the whole RPC failed). It will
// be exposed in MutateRowsException's FailedMutations.
private static final StatusCode LOCAL_UNKNOWN_STATUS =
Expand Down Expand Up @@ -116,25 +116,20 @@ public Object getTransportCode() {
@Nonnull private TimedAttemptSettings attemptSettings;

// Parent controller
private RetryingFuture<Void> externalFuture;
private RetryingFuture<MutateRowsAttemptErrors> externalFuture;

// Simple wrappers for handling result futures
private final ApiFunction<List<MutateRowsResponse>, Void> attemptSuccessfulCallback =
new ApiFunction<List<MutateRowsResponse>, Void>() {
@Override
public Void apply(List<MutateRowsResponse> responses) {
handleAttemptSuccess(responses);
return null;
}
};
private final ApiFunction<List<MutateRowsResponse>, MutateRowsAttemptErrors>
attemptSuccessfulCallback =
responses -> {
handleAttemptSuccess(responses);
return null;
};

private final ApiFunction<Throwable, List<MutateRowsResponse>> attemptFailedCallback =
new ApiFunction<Throwable, List<MutateRowsResponse>>() {
@Override
public List<MutateRowsResponse> apply(Throwable throwable) {
handleAttemptError(throwable);
return null;
}
throwable -> {
handleAttemptError(throwable);
return null;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
};

MutateRowsAttemptCallable(
Expand All @@ -148,12 +143,12 @@ public List<MutateRowsResponse> apply(Throwable throwable) {
this.callContext = Preconditions.checkNotNull(callContext, "callContext");
this.retryableCodes = Preconditions.checkNotNull(retryableCodes, "retryableCodes");
this.retryAlgorithm = retryAlgorithm;
this.attemptSettings = retryAlgorithm.createFirstAttempt();
this.attemptSettings = retryAlgorithm.createFirstAttempt(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused why the arg was introduced here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createFirstAttempt() is deprecated, it suggests to use the overload the accepts a parameter, and the default value for it it null 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted, as to not mix functional changes with random changes


permanentFailures = Lists.newArrayList();
}

public void setExternalFuture(RetryingFuture<Void> externalFuture) {
public void setExternalFuture(RetryingFuture<MutateRowsAttemptErrors> externalFuture) {
this.externalFuture = externalFuture;
}

Expand All @@ -166,7 +161,7 @@ public void setExternalFuture(RetryingFuture<Void> externalFuture) {
* return of this method should just be ignored.
*/
@Override
public Void call() {
public MutateRowsAttemptErrors call() {
try {
// externalFuture is set from MutateRowsRetryingCallable before invoking this method. It
// shouldn't be null unless the code changed
Expand All @@ -192,7 +187,7 @@ public Void call() {
}

// Handle concurrent cancellation
externalFuture.setAttemptFuture(new NonCancellableFuture<Void>());
externalFuture.setAttemptFuture(new NonCancellableFuture<>());
if (externalFuture.isDone()) {
return null;
}
Expand All @@ -208,13 +203,13 @@ public Void call() {

// Inspect the results and either propagate the success, or prepare to retry the failed
// mutations
ApiFuture<Void> transformed =
ApiFuture<MutateRowsAttemptErrors> transformed =
ApiFutures.transform(catching, attemptSuccessfulCallback, MoreExecutors.directExecutor());

// Notify the parent of the attempt
externalFuture.setAttemptFuture(transformed);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<Void>immediateFailedFuture(e));
externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e));
}

return null;
Expand Down Expand Up @@ -267,7 +262,7 @@ private void handleAttemptError(Throwable rpcError) {
* {@link MutateRowsException}. If no errors exist, then the attempt future is successfully
* completed. We don't currently handle RetryInfo on entry level failures.
*/
private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
private MutateRowsAttemptErrors handleAttemptSuccess(List<MutateRowsResponse> responses) {
List<FailedMutation> allFailures = Lists.newArrayList(permanentFailures);
MutateRowsRequest lastRequest = currentRequest;

Expand Down Expand Up @@ -326,8 +321,9 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
throw MutateRowsException.create(null, allFailures, isRetryable);
return new MutateRowsAttemptErrors(allFailures, isRetryable);
}
return null;
}

/**
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import java.util.List;

public class MutateRowsAttemptErrors {

public boolean isRetryable;
public List<FailedMutation> failedMutations;

public MutateRowsAttemptErrors(List<FailedMutation> failedMutations, boolean isRetryable) {
this.failedMutations = failedMutations;
this.isRetryable = isRetryable;
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
Expand All @@ -40,17 +41,18 @@
* @see MutateRowsAttemptCallable for more details.
*/
@InternalApi
public class MutateRowsRetryingCallable extends UnaryCallable<MutateRowsRequest, Void> {
public class MutateRowsRetryingCallable
extends UnaryCallable<MutateRowsRequest, MutateRowsAttemptErrors> {
private final ApiCallContext callContextPrototype;
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutorWithContext<Void> executor;
private final RetryingExecutorWithContext<MutateRowsAttemptErrors> executor;
private final ImmutableSet<Code> retryCodes;
private final RetryAlgorithm retryAlgorithm;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutorWithContext<Void> executor,
@Nonnull RetryingExecutorWithContext<MutateRowsAttemptErrors> executor,
@Nonnull Set<StatusCode.Code> retryCodes,
@Nonnull RetryAlgorithm retryAlgorithm) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
Expand All @@ -61,14 +63,20 @@ public MutateRowsRetryingCallable(
}

@Override
public RetryingFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext inputContext) {
public RetryingFuture<MutateRowsAttemptErrors> futureCall(
MutateRowsRequest request, ApiCallContext inputContext) {
ApiCallContext context = callContextPrototype.nullToSelf(inputContext);
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes, retryAlgorithm);

RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable, context);
RetryingFuture<MutateRowsAttemptErrors> retryingFuture =
executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
retryCallable.call();
MutateRowsAttemptErrors mutateRowsAttemptErrors = retryCallable.call();
if (mutateRowsAttemptErrors != null) {
throw MutateRowsException.create(
null, mutateRowsAttemptErrors.failedMutations, mutateRowsAttemptErrors.isRetryable);
}

return retryingFuture;
}
Expand Down
@@ -0,0 +1,75 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptErrors;
import org.checkerframework.checker.nullness.qual.Nullable;

@InternalApi
public class MutateRowsErrorRetryAlgorithm
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
extends BasicResultRetryAlgorithm<MutateRowsAttemptErrors> {
BasicResultRetryAlgorithm<MutateRowsAttemptErrors> retryAlgorithm;

public MutateRowsErrorRetryAlgorithm(
BasicResultRetryAlgorithm<MutateRowsAttemptErrors> retryAlgorithm) {
this.retryAlgorithm = retryAlgorithm;
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable previousThrowable,
MutateRowsAttemptErrors previousResponse,
TimedAttemptSettings previousSettings) {
return retryAlgorithm.createNextAttempt(previousThrowable, previousResponse, previousSettings);
}

@Override
public TimedAttemptSettings createNextAttempt(
RetryingContext context,
Throwable previousThrowable,
MutateRowsAttemptErrors previousResponse,
TimedAttemptSettings previousSettings) {
return retryAlgorithm.createNextAttempt(
context, previousThrowable, previousResponse, previousSettings);
}

@Override
public boolean shouldRetry(
Throwable previousThrowable, MutateRowsAttemptErrors previousResponse) {
if (retryAlgorithm.shouldRetry(previousThrowable, previousResponse)) {
return true;
}
return shouldRetry(null, previousThrowable, previousResponse);
}

@Override
public boolean shouldRetry(
@Nullable RetryingContext context,
Throwable previousThrowable,
MutateRowsAttemptErrors previousResponse) {
if (retryAlgorithm.shouldRetry(context, previousThrowable, previousResponse)) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
if (previousResponse == null) {
return false;
}
return previousResponse.isRetryable;
}
}