Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update the accounting of partial batch mutations #2149

Merged
merged 32 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,7 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.util.Set;
Expand Down Expand Up @@ -57,11 +58,12 @@
* @see RetrySettings for retry configuration.
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMutation, Void> {
public final class BigtableBatchingCallSettings
extends UnaryCallSettings<BulkMutation, MutateRowsAttemptResult> {

// This settings is just a simple wrapper for BatchingCallSettings to allow us to add
// additional functionality.
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, Void>
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingCallSettings;
private final boolean isLatencyBasedThrottlingEnabled;
private final Long targetRpcLatencyMs;
Expand Down Expand Up @@ -89,7 +91,8 @@ public BatchingSettings getBatchingSettings() {
}

/** Returns an adapter that packs and unpacks batching elements. */
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> getBatchingDescriptor() {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
getBatchingDescriptor() {
return batchingCallSettings.getBatchingDescriptor();
}

Expand Down Expand Up @@ -120,7 +123,8 @@ public boolean isServerInitiatedFlowControlEnabled() {
}

static Builder newBuilder(
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
return new Builder(batchingDescriptor);
}

Expand Down Expand Up @@ -148,9 +152,11 @@ public String toString() {
* A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of
* {@link BigtableBatchingCallSettings} for a description of the different values that can be set.
*/
public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void> {
public static class Builder
extends UnaryCallSettings.Builder<BulkMutation, MutateRowsAttemptResult> {

private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor;
private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor;
private BatchingSettings batchingSettings;
private boolean isLatencyBasedThrottlingEnabled;
private Long targetRpcLatencyMs;
Expand All @@ -160,7 +166,8 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void

private Builder(
@Nonnull
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null");
}
Expand Down
Expand Up @@ -102,6 +102,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
Expand All @@ -112,6 +113,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.MutateRowsErrorRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -165,7 +167,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> bulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
Expand Down Expand Up @@ -672,20 +674,16 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
*
* @see MutateRowsRetryingCallable for more details
*/
private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
private UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> createMutateRowsBaseCallable() {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowsRequest, MutateRowsResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowsMethod())
.setParamsExtractor(
new RequestParamsExtractor<MutateRowsRequest>() {
@Override
public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
return ImmutableMap.of(
mutateRowsRequest ->
ImmutableMap.of(
"table_name", mutateRowsRequest.getTableName(),
"app_profile_id", mutateRowsRequest.getAppProfileId());
}
})
"app_profile_id", mutateRowsRequest.getAppProfileId()))
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

Expand All @@ -706,22 +704,23 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

BasicResultRetryAlgorithm<Void> resultRetryAlgorithm;
BasicResultRetryAlgorithm<MutateRowsAttemptResult> resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
}
MutateRowsErrorRetryAlgorithm mutateRowsErrorRetryAlgorithm =
new MutateRowsErrorRetryAlgorithm(resultRetryAlgorithm);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

RetryAlgorithm<Void> retryAlgorithm =
RetryAlgorithm<MutateRowsAttemptResult> retryAlgorithm =
new RetryAlgorithm<>(
resultRetryAlgorithm,
mutateRowsErrorRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));

RetryingExecutorWithContext<Void> retryingExecutor =
RetryingExecutorWithContext<MutateRowsAttemptResult> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
Expand All @@ -745,16 +744,17 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
* <li>Add tracing & metrics.
* </ul>
*/
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();
private UnaryCallable<BulkMutation, MutateRowsAttemptResult> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> baseCallable =
createMutateRowsBaseCallable();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
Expand All @@ -764,16 +764,16 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> traced =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> traced =
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

Expand Down Expand Up @@ -1171,11 +1171,13 @@ public UnaryCallable<RowMutation, Void> mutateRowCallable() {
}

/**
* Returns the callable chain created in {@link #createBulkMutateRowsCallable()} ()} during stub
* Returns the callable chain created in {@link #createBulkMutateRowsCallable()} during stub
* construction.
*/
public UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable() {
return bulkMutateRowsCallable;
UnaryCallable<BulkMutation, Void> errorsConverter =
new MutateRowsErrorConverterUnaryCallable(bulkMutateRowsCallable);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
return errorsConverter;
}

/**
Expand Down
@@ -0,0 +1,49 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.util.concurrent.MoreExecutors;

public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<BulkMutation, Void> {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> innerCallable;

MutateRowsErrorConverterUnaryCallable(
UnaryCallable<BulkMutation, MutateRowsAttemptResult> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
ApiFuture<MutateRowsAttemptResult> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future,
result -> {
if (!result.failedMutations.isEmpty()) {
throw MutateRowsException.create(null, result.failedMutations, result.isRetryable);
}
return null;
},
MoreExecutors.directExecutor());
}
}
Expand Up @@ -30,18 +30,22 @@
* applications.
*/
@InternalApi
public final class BulkMutateRowsUserFacingCallable extends UnaryCallable<BulkMutation, Void> {
private final UnaryCallable<MutateRowsRequest, Void> innerCallable;
public final class BulkMutateRowsUserFacingCallable
extends UnaryCallable<BulkMutation, MutateRowsAttemptResult> {

private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable;
private final RequestContext requestContext;

public BulkMutateRowsUserFacingCallable(
UnaryCallable<MutateRowsRequest, Void> innerCallable, RequestContext requestContext) {
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable,
RequestContext requestContext) {
this.innerCallable = innerCallable;
this.requestContext = requestContext;
}

@Override
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
public ApiFuture<MutateRowsAttemptResult> futureCall(
BulkMutation request, ApiCallContext context) {
return innerCallable.futureCall(request.toProto(requestContext), context);
}
}
Expand Up @@ -87,7 +87,7 @@
*
* <p>Package-private for internal use.
*/
class MutateRowsAttemptCallable implements Callable<Void> {
class MutateRowsAttemptCallable implements Callable<MutateRowsAttemptResult> {
// Synthetic status for Mutations that didn't get a result (because the whole RPC failed). It will
// be exposed in MutateRowsException's FailedMutations.
private static final StatusCode LOCAL_UNKNOWN_STATUS =
Expand Down Expand Up @@ -116,25 +116,16 @@ public Object getTransportCode() {
@Nonnull private TimedAttemptSettings attemptSettings;

// Parent controller
private RetryingFuture<Void> externalFuture;
private RetryingFuture<MutateRowsAttemptResult> externalFuture;

// Simple wrappers for handling result futures
private final ApiFunction<List<MutateRowsResponse>, Void> attemptSuccessfulCallback =
new ApiFunction<List<MutateRowsResponse>, Void>() {
@Override
public Void apply(List<MutateRowsResponse> responses) {
handleAttemptSuccess(responses);
return null;
}
};
private final ApiFunction<List<MutateRowsResponse>, MutateRowsAttemptResult>
attemptSuccessfulCallback = responses -> handleAttemptSuccess(responses);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

private final ApiFunction<Throwable, List<MutateRowsResponse>> attemptFailedCallback =
new ApiFunction<Throwable, List<MutateRowsResponse>>() {
@Override
public List<MutateRowsResponse> apply(Throwable throwable) {
handleAttemptError(throwable);
return null;
}
throwable -> {
handleAttemptError(throwable);
return null;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
};

MutateRowsAttemptCallable(
Expand All @@ -148,12 +139,12 @@ public List<MutateRowsResponse> apply(Throwable throwable) {
this.callContext = Preconditions.checkNotNull(callContext, "callContext");
this.retryableCodes = Preconditions.checkNotNull(retryableCodes, "retryableCodes");
this.retryAlgorithm = retryAlgorithm;
this.attemptSettings = retryAlgorithm.createFirstAttempt();
this.attemptSettings = retryAlgorithm.createFirstAttempt(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused why the arg was introduced here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createFirstAttempt() is deprecated, it suggests to use the overload the accepts a parameter, and the default value for it it null 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted, as to not mix functional changes with random changes


permanentFailures = Lists.newArrayList();
}

public void setExternalFuture(RetryingFuture<Void> externalFuture) {
public void setExternalFuture(RetryingFuture<MutateRowsAttemptResult> externalFuture) {
this.externalFuture = externalFuture;
}

Expand All @@ -166,7 +157,7 @@ public void setExternalFuture(RetryingFuture<Void> externalFuture) {
* return of this method should just be ignored.
*/
@Override
public Void call() {
public MutateRowsAttemptResult call() {
try {
// externalFuture is set from MutateRowsRetryingCallable before invoking this method. It
// shouldn't be null unless the code changed
Expand All @@ -192,7 +183,7 @@ public Void call() {
}

// Handle concurrent cancellation
externalFuture.setAttemptFuture(new NonCancellableFuture<Void>());
externalFuture.setAttemptFuture(new NonCancellableFuture<>());
if (externalFuture.isDone()) {
return null;
}
Expand All @@ -208,13 +199,13 @@ public Void call() {

// Inspect the results and either propagate the success, or prepare to retry the failed
// mutations
ApiFuture<Void> transformed =
ApiFuture<MutateRowsAttemptResult> transformed =
ApiFutures.transform(catching, attemptSuccessfulCallback, MoreExecutors.directExecutor());

// Notify the parent of the attempt
externalFuture.setAttemptFuture(transformed);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<Void>immediateFailedFuture(e));
externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e));
}

return null;
Expand Down Expand Up @@ -267,7 +258,7 @@ private void handleAttemptError(Throwable rpcError) {
* {@link MutateRowsException}. If no errors exist, then the attempt future is successfully
* completed. We don't currently handle RetryInfo on entry level failures.
*/
private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
private MutateRowsAttemptResult handleAttemptSuccess(List<MutateRowsResponse> responses) {
List<FailedMutation> allFailures = Lists.newArrayList(permanentFailures);
MutateRowsRequest lastRequest = currentRequest;

Expand Down Expand Up @@ -326,8 +317,9 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
throw MutateRowsException.create(null, allFailures, isRetryable);
return new MutateRowsAttemptResult(allFailures, isRetryable);
}
return new MutateRowsAttemptResult();
}

/**
Expand Down