Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update the accounting of partial batch mutations #2149

Merged
merged 32 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
fix callable order
  • Loading branch information
ron-gal committed Mar 12, 2024
commit bc486e58e032e6887040226e8f9c80b34392afb4
Expand Up @@ -28,6 +28,7 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.util.Set;
Expand Down Expand Up @@ -57,11 +58,12 @@
* @see RetrySettings for retry configuration.
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMutation, Void> {
public final class BigtableBatchingCallSettings
extends UnaryCallSettings<BulkMutation, MutateRowsAttemptResult> {

// This settings is just a simple wrapper for BatchingCallSettings to allow us to add
// additional functionality.
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, Void>
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingCallSettings;
private final boolean isLatencyBasedThrottlingEnabled;
private final Long targetRpcLatencyMs;
Expand Down Expand Up @@ -89,7 +91,8 @@ public BatchingSettings getBatchingSettings() {
}

/** Returns an adapter that packs and unpacks batching elements. */
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> getBatchingDescriptor() {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
getBatchingDescriptor() {
return batchingCallSettings.getBatchingDescriptor();
}

Expand Down Expand Up @@ -120,7 +123,8 @@ public boolean isServerInitiatedFlowControlEnabled() {
}

static Builder newBuilder(
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
return new Builder(batchingDescriptor);
}

Expand Down Expand Up @@ -148,9 +152,11 @@ public String toString() {
* A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of
* {@link BigtableBatchingCallSettings} for a description of the different values that can be set.
*/
public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void> {
public static class Builder
extends UnaryCallSettings.Builder<BulkMutation, MutateRowsAttemptResult> {

private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor;
private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor;
private BatchingSettings batchingSettings;
private boolean isLatencyBasedThrottlingEnabled;
private Long targetRpcLatencyMs;
Expand All @@ -160,7 +166,8 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void

private Builder(
@Nonnull
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null");
}
Expand Down
Expand Up @@ -167,7 +167,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> bulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
Expand Down Expand Up @@ -674,7 +674,7 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
*
* @see MutateRowsRetryingCallable for more details
*/
private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
private UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> createMutateRowsBaseCallable() {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowsRequest, MutateRowsResponse>newBuilder()
Expand Down Expand Up @@ -721,17 +721,12 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {

RetryingExecutorWithContext<MutateRowsAttemptResult> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
MutateRowsRetryingCallable mutateRowsRetryingCallable =
new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);

UnaryCallable<MutateRowsRequest, Void> errorsConverter =
new MutateRowsErrorConverterUnaryCallable(mutateRowsRetryingCallable);
return errorsConverter;
return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);
}

/**
Expand All @@ -749,16 +744,17 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
* <li>Add tracing & metrics.
* </ul>
*/
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();
private UnaryCallable<BulkMutation, MutateRowsAttemptResult> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> baseCallable =
createMutateRowsBaseCallable();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
Expand All @@ -768,16 +764,16 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> traced =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> traced =
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

Expand Down Expand Up @@ -1175,11 +1171,13 @@ public UnaryCallable<RowMutation, Void> mutateRowCallable() {
}

/**
* Returns the callable chain created in {@link #createBulkMutateRowsCallable()} ()} during stub
* Returns the callable chain created in {@link #createBulkMutateRowsCallable()} during stub
* construction.
*/
public UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable() {
return bulkMutateRowsCallable;
UnaryCallable<BulkMutation, Void> errorsConverter =
new MutateRowsErrorConverterUnaryCallable(bulkMutateRowsCallable);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
return errorsConverter;
}

/**
Expand Down
Expand Up @@ -19,22 +19,22 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.util.concurrent.MoreExecutors;

public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<MutateRowsRequest, Void> {
public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<BulkMutation, Void> {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable;
private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> innerCallable;

MutateRowsErrorConverterUnaryCallable(
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> callable) {
UnaryCallable<BulkMutation, MutateRowsAttemptResult> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext context) {
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
ApiFuture<MutateRowsAttemptResult> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future,
Expand Down
Expand Up @@ -30,18 +30,22 @@
* applications.
*/
@InternalApi
public final class BulkMutateRowsUserFacingCallable extends UnaryCallable<BulkMutation, Void> {
private final UnaryCallable<MutateRowsRequest, Void> innerCallable;
public final class BulkMutateRowsUserFacingCallable
extends UnaryCallable<BulkMutation, MutateRowsAttemptResult> {

private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable;
private final RequestContext requestContext;

public BulkMutateRowsUserFacingCallable(
UnaryCallable<MutateRowsRequest, Void> innerCallable, RequestContext requestContext) {
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable,
RequestContext requestContext) {
this.innerCallable = innerCallable;
this.requestContext = requestContext;
}

@Override
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
public ApiFuture<MutateRowsAttemptResult> futureCall(
BulkMutation request, ApiCallContext context) {
return innerCallable.futureCall(request.toProto(requestContext), context);
}
}
Expand Up @@ -37,7 +37,7 @@
*/
@InternalApi("For internal use only")
public class MutateRowsBatchingDescriptor
implements BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> {
implements BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult> {

@Override
public BatchingRequestBuilder<RowMutationEntry, BulkMutation> newRequestBuilder(
Expand All @@ -46,7 +46,11 @@ public BatchingRequestBuilder<RowMutationEntry, BulkMutation> newRequestBuilder(
}

@Override
public void splitResponse(Void response, List<BatchEntry<RowMutationEntry, Void>> entries) {
public void splitResponse(
MutateRowsAttemptResult response, List<BatchEntry<RowMutationEntry, Void>> entries) {
for (FailedMutation mutation : response.failedMutations) {
entries.get(mutation.getIndex()).getResultFuture().setException(mutation.getError());
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
for (BatchEntry<RowMutationEntry, Void> batchResponse : entries) {
batchResponse.getResultFuture().set(null);
}
Expand Down
Expand Up @@ -20,12 +20,15 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
Expand All @@ -40,11 +43,13 @@
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
Expand Down Expand Up @@ -448,7 +453,8 @@ public Object answer(InvocationOnMock invocation) {
try (Batcher<RowMutationEntry, Void> batcher =
new BatcherImpl<>(
batchingDescriptor,
stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext),
new ReverseErrorsConverter(
stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext)),
BulkMutation.create(TABLE_ID),
settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(),
Executors.newSingleThreadScheduledExecutor(),
Expand All @@ -474,4 +480,22 @@ public Object answer(InvocationOnMock invocation) {
private static <T> StreamObserver<T> anyObserver(Class<T> returnType) {
return (StreamObserver<T>) any(returnType);
}

private class ReverseErrorsConverter
extends UnaryCallable<BulkMutation, MutateRowsAttemptResult> {

private final UnaryCallable<BulkMutation, Void> innerCallable;

ReverseErrorsConverter(UnaryCallable<BulkMutation, Void> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<MutateRowsAttemptResult> futureCall(
BulkMutation request, ApiCallContext context) {
ApiFuture<Void> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future, result -> new MutateRowsAttemptResult(), MoreExecutors.directExecutor());
}
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Up @@ -92,7 +92,7 @@ public void splitResponseTest() {
assertThat(batchResponse.get(1).getResultFuture().isDone()).isFalse();

MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
underTest.splitResponse(null, batchResponse);
underTest.splitResponse(new MutateRowsAttemptResult(), batchResponse);
assertThat(batchResponse.get(0).getResultFuture().isDone()).isTrue();
assertThat(batchResponse.get(1).getResultFuture().isDone()).isTrue();
}
Expand Down