Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update the accounting of partial batch mutations #2149

Merged
merged 32 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,7 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.util.Set;
Expand Down Expand Up @@ -57,11 +58,12 @@
* @see RetrySettings for retry configuration.
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMutation, Void> {
public final class BigtableBatchingCallSettings
extends UnaryCallSettings<BulkMutation, MutateRowsAttemptResult> {

// This settings is just a simple wrapper for BatchingCallSettings to allow us to add
// additional functionality.
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, Void>
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingCallSettings;
private final boolean isLatencyBasedThrottlingEnabled;
private final Long targetRpcLatencyMs;
Expand Down Expand Up @@ -89,7 +91,8 @@ public BatchingSettings getBatchingSettings() {
}

/** Returns an adapter that packs and unpacks batching elements. */
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> getBatchingDescriptor() {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
getBatchingDescriptor() {
return batchingCallSettings.getBatchingDescriptor();
}

Expand Down Expand Up @@ -120,7 +123,8 @@ public boolean isServerInitiatedFlowControlEnabled() {
}

static Builder newBuilder(
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
return new Builder(batchingDescriptor);
}

Expand Down Expand Up @@ -148,9 +152,11 @@ public String toString() {
* A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of
* {@link BigtableBatchingCallSettings} for a description of the different values that can be set.
*/
public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void> {
public static class Builder
extends UnaryCallSettings.Builder<BulkMutation, MutateRowsAttemptResult> {

private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor;
private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor;
private BatchingSettings batchingSettings;
private boolean isLatencyBasedThrottlingEnabled;
private Long targetRpcLatencyMs;
Expand All @@ -160,7 +166,8 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void

private Builder(
@Nonnull
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null");
}
Expand Down
Expand Up @@ -102,6 +102,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
Expand All @@ -112,6 +113,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.MutateRowsPartialErrorRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -165,7 +167,8 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> bulkMutateRowsCallable;
private final UnaryCallable<BulkMutation, Void> externalBulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
Expand Down Expand Up @@ -368,7 +371,9 @@ public EnhancedBigtableStub(
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
bulkMutateRowsCallable = createMutateRowsBaseCallable();
externalBulkMutateRowsCallable =
new MutateRowsErrorConverterUnaryCallable(bulkMutateRowsCallable);
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
generateInitialChangeStreamPartitionsCallable =
Expand Down Expand Up @@ -665,14 +670,23 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
}

/**
* Internal helper to create the base MutateRows callable chain. The chain is responsible for
* retrying individual entry in case of error.
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
* batching. The chain will:
*
* <p>NOTE: the caller is responsible for adding tracing & metrics.
* <ul>
* <li>Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
* been applied, are filtered from the next attempt. Also, any entries that failed with a
* nontransient error, are filtered from the next attempt. This will continue until there
* are no more entries or there are no more retry attempts left.
* <li>Wrap batch failures in a {@link
* com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
* <li>Add tracing & metrics.
* </ul>
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
*
* @see MutateRowsRetryingCallable for more details
* This callable returns an internal type {@link MutateRowsAttemptResult}.
*/
private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
private UnaryCallable<BulkMutation, MutateRowsAttemptResult> createMutateRowsBaseCallable() {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowsRequest, MutateRowsResponse>newBuilder()
Expand Down Expand Up @@ -706,55 +720,38 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

BasicResultRetryAlgorithm<Void> resultRetryAlgorithm;
BasicResultRetryAlgorithm<MutateRowsAttemptResult> resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
}
MutateRowsPartialErrorRetryAlgorithm mutateRowsPartialErrorRetryAlgorithm =
new MutateRowsPartialErrorRetryAlgorithm(resultRetryAlgorithm);

RetryAlgorithm<Void> retryAlgorithm =
RetryAlgorithm<MutateRowsAttemptResult> retryAlgorithm =
new RetryAlgorithm<>(
resultRetryAlgorithm,
mutateRowsPartialErrorRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));

RetryingExecutorWithContext<Void> retryingExecutor =
RetryingExecutorWithContext<MutateRowsAttemptResult> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> baseCallable =
new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);
}

/**
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
* batching. The chain will:
*
* <ul>
* <li>Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
* been applied, are filtered from the next attempt. Also, any entries that failed with a
* nontransient error, are filtered from the next attempt. This will continue until there
* are no more entries or there are no more retry attempts left.
* <li>Wrap batch failures in a {@link
* com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
* <li>Add tracing & metrics.
* </ul>
*/
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
Expand All @@ -764,16 +761,16 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> traced =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> traced =
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

Expand Down Expand Up @@ -1171,11 +1168,11 @@ public UnaryCallable<RowMutation, Void> mutateRowCallable() {
}

/**
* Returns the callable chain created in {@link #createBulkMutateRowsCallable()} ()} during stub
* Returns the callable chain created in {@link #createMutateRowsBaseCallable()} during stub
* construction.
*/
public UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable() {
return bulkMutateRowsCallable;
return externalBulkMutateRowsCallable;
}

/**
Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.util.concurrent.MoreExecutors;

/**
* This callable converts partial batch failures into an exception. This is necessary to make sure
* that the caller properly handles issues and avoids possible data loss on partial failures
*/
@InternalApi
public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<BulkMutation, Void> {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> innerCallable;

public MutateRowsErrorConverterUnaryCallable(
UnaryCallable<BulkMutation, MutateRowsAttemptResult> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
ApiFuture<MutateRowsAttemptResult> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future,
result -> {
if (!result.failedMutations.isEmpty()) {
throw MutateRowsException.create(null, result.failedMutations, result.isRetryable);
}
return null;
},
MoreExecutors.directExecutor());
}
}
Expand Up @@ -30,18 +30,22 @@
* applications.
*/
@InternalApi
public final class BulkMutateRowsUserFacingCallable extends UnaryCallable<BulkMutation, Void> {
private final UnaryCallable<MutateRowsRequest, Void> innerCallable;
public final class BulkMutateRowsUserFacingCallable
extends UnaryCallable<BulkMutation, MutateRowsAttemptResult> {

private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable;
private final RequestContext requestContext;

public BulkMutateRowsUserFacingCallable(
UnaryCallable<MutateRowsRequest, Void> innerCallable, RequestContext requestContext) {
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable,
RequestContext requestContext) {
this.innerCallable = innerCallable;
this.requestContext = requestContext;
}

@Override
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
public ApiFuture<MutateRowsAttemptResult> futureCall(
BulkMutation request, ApiCallContext context) {
return innerCallable.futureCall(request.toProto(requestContext), context);
}
}