Skip to content

Commit

Permalink
fix: add a ReadFirstRow callable to set future in onComplete (#1326)
Browse files Browse the repository at this point in the history
* fix: add a ReadFirstRow callable to set future in onComplete

* use ReadRowsFirst callable instead

* don't use atomic

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
mutianf and gcf-owl-bot[bot] committed Aug 2, 2022
1 parent 644aeb3 commit cb539b5
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
Expand Down Expand Up @@ -366,10 +367,16 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
.build(),
rowAdapter);

UnaryCallable<Query, RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext).first();
ReadRowsUserCallable<RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowCallable, clientContext.getTracerFactory(), getSpanName("ReadRow"));

ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(traced);

return createUserFacingUnaryCallable("ReadRow", readRowCallable);
return firstRow.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,64 @@
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.Query;

/**
* Enhancement for `readRowsCallable().first()` to gracefully limit the row count instead of
* cancelling the RPC
*/
class ReadRowsFirstCallable<RowT> extends UnaryCallable<Query, RowT> {
private final UnaryCallable<Query, RowT> inner;
@InternalApi
public class ReadRowsFirstCallable<RowT> extends UnaryCallable<Query, RowT> {

ReadRowsFirstCallable(UnaryCallable<Query, RowT> inner) {
private final ServerStreamingCallable<Query, RowT> inner;

public ReadRowsFirstCallable(ServerStreamingCallable<Query, RowT> inner) {
this.inner = inner;
}

@Override
public ApiFuture<RowT> futureCall(Query query, ApiCallContext context) {
return inner.futureCall(query.limit(1), context);
ReadRowsFirstResponseObserver<RowT> observer = new ReadRowsFirstResponseObserver<>();
this.inner.call(query.limit(1), observer, context);
return observer.getFuture();
}

private class ReadRowsFirstResponseObserver<RowT> extends StateCheckingResponseObserver<RowT> {
private StreamController innerController;
private RowT firstRow;
private SettableApiFuture<RowT> settableFuture = SettableApiFuture.create();

@Override
protected void onStartImpl(StreamController streamController) {
this.innerController = streamController;
}

@Override
protected void onResponseImpl(RowT response) {
if (firstRow == null) {
this.firstRow = response;
}
}

@Override
protected void onErrorImpl(Throwable throwable) {
settableFuture.setException(throwable);
}

@Override
protected void onCompleteImpl() {
settableFuture.set(firstRow);
}

protected ApiFuture<RowT> getFuture() {
return settableFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.Query;
Expand All @@ -34,27 +33,16 @@
public class ReadRowsUserCallable<RowT> extends ServerStreamingCallable<Query, RowT> {
private final ServerStreamingCallable<ReadRowsRequest, RowT> inner;
private final RequestContext requestContext;
private final ReadRowsFirstCallable<RowT> firstCallable;

public ReadRowsUserCallable(
ServerStreamingCallable<ReadRowsRequest, RowT> inner, RequestContext requestContext) {
this.inner = inner;
this.requestContext = requestContext;

this.firstCallable = new ReadRowsFirstCallable<>(super.first());
}

@Override
public void call(Query request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
ReadRowsRequest innerRequest = request.toProto(requestContext);
inner.call(innerRequest, responseObserver, context);
}

// Optimization: since the server supports row limits, override the first callable.
// This way unnecessary data doesn't need to be buffered and the number of CANCELLED request
// statuses is minimized
@Override
public UnaryCallable<Query, RowT> first() {
return firstCallable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.UnaryCallable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand All @@ -38,36 +43,33 @@ public class ReadRowsFirstCallableTest {

private static final RequestContext REQUEST_CONTEXT =
RequestContext.create("fake-project", "fake-instance", "fake-profile");
private UnaryCallable<Query, Row> innerCallable;

private ServerStreamingCallable<Query, Row> innerCallable;
private ArgumentCaptor<Query> innerQuery;
private SettableApiFuture<Row> innerResult;

@SuppressWarnings("unchecked")
@Before
public void setUp() {
innerCallable = Mockito.mock(UnaryCallable.class);
innerCallable = Mockito.mock(ServerStreamingCallable.class);
innerQuery = ArgumentCaptor.forClass(Query.class);
innerResult = SettableApiFuture.create();
Mockito.when(innerCallable.futureCall(innerQuery.capture(), Mockito.any()))
.thenReturn(innerResult);
}

@Test
public void testLimitAdded() {
ReadRowsFirstCallable<Row> callable = new ReadRowsFirstCallable<>(innerCallable);
innerResult.set(null);
callable.call(Query.create("fake-table"));

callable.futureCall(Query.create("fake-table"), GrpcCallContext.createDefault());
verify(innerCallable)
.call(innerQuery.capture(), any(ResponseObserver.class), any(ApiCallContext.class));
Truth.assertThat(innerQuery.getValue().toProto(REQUEST_CONTEXT))
.isEqualTo(Query.create("fake-table").limit(1).toProto(REQUEST_CONTEXT));
}

@Test
public void testLimitChanged() {
ReadRowsFirstCallable<Row> callable = new ReadRowsFirstCallable<>(innerCallable);
innerResult.set(null);
callable.call(Query.create("fake-table").limit(1_000));

callable.futureCall(Query.create("fake-table").limit(10), GrpcCallContext.createDefault());
verify(innerCallable)
.call(innerQuery.capture(), any(ResponseObserver.class), any(ApiCallContext.class));
Truth.assertThat(innerQuery.getValue().toProto(REQUEST_CONTEXT))
.isEqualTo(Query.create("fake-table").limit(1).toProto(REQUEST_CONTEXT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,4 @@ public void testRequestConverted() {

Truth.assertThat(innerCallable.getActualRequest()).isEqualTo(query.toProto(REQUEST_CONTEXT));
}

@Test
public void testFirstIsLimited() {
ServerStreamingStashCallable<ReadRowsRequest, Row> innerCallable =
new ServerStreamingStashCallable<>();
ReadRowsUserCallable<Row> callable = new ReadRowsUserCallable<>(innerCallable, REQUEST_CONTEXT);
Query query = Query.create("fake-table");

callable.first().call(query);

Truth.assertThat(innerCallable.getActualRequest())
.isEqualTo(query.limit(1).toProto(REQUEST_CONTEXT));
}
}

0 comments on commit cb539b5

Please sign in to comment.