Skip to content

Commit

Permalink
feat: reverse scans public preview (#1711)
Browse files Browse the repository at this point in the history
This adds a reversed boolean to Query, which will allow endusers to stream rows in reverse order.

Example:
```java
Query query = Query.create("alphabet").range("a", "z").limit(3);
ServerStream<Row> results = client.readRows(query);

for (Row row : results) {
  System.out.println(row.getKey().toStringUtf8());
}
// Prints z, y, x
```
  • Loading branch information
igorbernstein2 committed Jun 27, 2023
1 parent f4f2e2e commit 176360f
Show file tree
Hide file tree
Showing 17 changed files with 337 additions and 86 deletions.
11 changes: 11 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,15 @@
<method>*</method>
<to>*</to>
</difference>
<!-- Removed methods in an internal class -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigtable/data/v2/internal/RowSetUtil</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger</className>
<method>*</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class RowMergerUtil implements AutoCloseable {

public RowMergerUtil() {
RowBuilder<Row> rowBuilder = new DefaultRowAdapter().createRowBuilder();
merger = new RowMerger<>(rowBuilder);
merger = new RowMerger<>(rowBuilder, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,80 +50,79 @@ public final class RowSetUtil {
private RowSetUtil() {}

/**
* Splits the provided {@link RowSet} along the provided splitPoint into 2 segments. The right
* segment will contain all keys that are strictly greater than the splitPoint and all {@link
* RowRange}s truncated to start right after the splitPoint. The primary usecase is to resume a
* broken ReadRows stream.
* Removes all the keys and range parts that fall on or before the splitPoint.
*
* <p>The direction of before is determined by fromStart: for forward scans fromStart is true and
* will remove all the keys and range segments that would've been read prior to the splitPoint
* (ie. all of the keys sort lexiographically at or before the split point. For reverse scans,
* fromStart is false and all segments that sort lexiographically at or after the split point are
* removed. The primary usecase is to resume a broken ReadRows stream.
*/
@Nonnull
public static Split split(@Nonnull RowSet rowSet, @Nonnull ByteString splitPoint) {
// Edgecase: splitPoint is the leftmost key ("")
if (splitPoint.isEmpty()) {
return Split.of(null, rowSet);
}
public static RowSet erase(RowSet rowSet, ByteString splitPoint, boolean fromStart) {
RowSet.Builder newRowSet = RowSet.newBuilder();

// An empty RowSet represents a full table scan. Make that explicit so that there is RowRange to
// split.
if (rowSet.getRowKeysList().isEmpty() && rowSet.getRowRangesList().isEmpty()) {
rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
}

RowSet.Builder leftBuilder = RowSet.newBuilder();
boolean leftIsEmpty = true;
RowSet.Builder rightBuilder = RowSet.newBuilder();
boolean rightIsEmpty = true;

// Handle point lookups
for (ByteString key : rowSet.getRowKeysList()) {
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) <= 0) {
leftBuilder.addRowKeys(key);
leftIsEmpty = false;
if (fromStart) {
// key is right of the split
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) > 0) {
newRowSet.addRowKeys(key);
}
} else {
rightBuilder.addRowKeys(key);
rightIsEmpty = false;
// key is left of the split
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) < 0) {
newRowSet.addRowKeys(key);
}
}
}

for (RowRange range : rowSet.getRowRangesList()) {
StartPoint startPoint = StartPoint.extract(range);
int startCmp =
ComparisonChain.start()
.compare(startPoint.value, splitPoint, ByteStringComparator.INSTANCE)
// when value lies on the split point, only closed start points are on the left
.compareTrueFirst(startPoint.isClosed, true)
.result();

// Range is fully on the right side
if (startCmp > 0) {
rightBuilder.addRowRanges(range);
rightIsEmpty = false;
continue;
// Handle ranges
for (RowRange rowRange : rowSet.getRowRangesList()) {
RowRange newRange = truncateRange(rowRange, splitPoint, fromStart);
if (newRange != null) {
newRowSet.addRowRanges(newRange);
}
}

EndPoint endPoint = EndPoint.extract(range);
int endCmp =
ComparisonChain.start()
// empty (true) end key means rightmost regardless of the split point
.compareFalseFirst(endPoint.value.isEmpty(), false)
.compare(endPoint.value, splitPoint, ByteStringComparator.INSTANCE)
// don't care if the endpoint is open/closed: both will be on the left if the value is
// <=
.result();

if (endCmp <= 0) {
// Range is fully on the left
leftBuilder.addRowRanges(range);
leftIsEmpty = false;
} else {
// Range is split
leftBuilder.addRowRanges(range.toBuilder().setEndKeyClosed(splitPoint));
leftIsEmpty = false;
rightBuilder.addRowRanges(range.toBuilder().setStartKeyOpen(splitPoint));
rightIsEmpty = false;
// Return the new rowset if there is anything left to read
RowSet result = newRowSet.build();
if (result.getRowKeysList().isEmpty() && result.getRowRangesList().isEmpty()) {
return null;
}
return result;
}

private static RowRange truncateRange(RowRange range, ByteString split, boolean fromStart) {
if (fromStart) {
// range end is on or left of the split: skip
if (EndPoint.extract(range).compareTo(new EndPoint(split, true)) <= 0) {
return null;
}
} else {
// range is on or right of the split
if (StartPoint.extract(range).compareTo(new StartPoint(split, true)) >= 0) {
return null;
}
}
RowRange.Builder newRange = range.toBuilder();

if (fromStart) {
// range start is on or left of the split
if (StartPoint.extract(range).compareTo(new StartPoint(split, true)) <= 0) {
newRange.setStartKeyOpen(split);
}
} else {
// range end is on or right of the split
if (EndPoint.extract(range).compareTo(new EndPoint(split, true)) >= 0) {
newRange.setEndKeyOpen(split);
}
}

return Split.of(
leftIsEmpty ? null : leftBuilder.build(), rightIsEmpty ? null : rightBuilder.build());
return newRange.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ public Query limit(long limit) {
return this;
}

/**
* Return rows in reverse order.
*
* <p>The row will be streamed in reverse lexiographic order of the keys. The row key ranges are
* still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
* will remain unchanged from the ordering forward scans. This is particularly useful to get the
* last N records before a key:
*
* <pre>{@code
* query
* .range(ByteStringRange.unbounded().endOpen("key"))
* .limit(10)
* .reversed(true)
* }</pre>
*/
public Query reversed(boolean enable) {
builder.setReversed(enable);
return this;
}

/**
* Split this query into multiple queries that can be evenly distributed across Bigtable nodes and
* be run in parallel. This method takes the results from {@link
Expand Down Expand Up @@ -379,11 +399,12 @@ public boolean advance(@Nonnull ByteString lastSeenRowKey) {

// Split the row ranges / row keys. Return false if there's nothing
// left on the right of the split point.
RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey);
if (split.getRight() == null) {
RowSet remaining =
RowSetUtil.erase(query.builder.getRows(), lastSeenRowKey, !query.builder.getReversed());
if (remaining == null) {
return false;
}
query.builder.setRows(split.getRight());
query.builder.setRows(remaining);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ private Builder() {
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());

featureFlags = FeatureFlags.newBuilder();
featureFlags = FeatureFlags.newBuilder().setReverseScans(true);
}

private Builder(EnhancedBigtableStubSettings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) {
return originalRequest;
}

RowSet remaining = RowSetUtil.split(originalRequest.getRows(), lastKey).getRight();
RowSet remaining =
RowSetUtil.erase(originalRequest.getRows(), lastKey, !originalRequest.getReversed());

// Edge case: retrying a fulfilled request.
// A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class RowMerger<RowT> implements Reframer<RowT, ReadRowsResponse> {
private final StateMachine<RowT> stateMachine;
private Queue<RowT> mergedRows;

public RowMerger(RowBuilder<RowT> rowBuilder) {
stateMachine = new StateMachine<>(rowBuilder);
public RowMerger(RowBuilder<RowT> rowBuilder, boolean reversed) {
stateMachine = new StateMachine<>(rowBuilder, reversed);
mergedRows = new ArrayDeque<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public RowMergingCallable(
public void call(
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
RowBuilder<RowT> rowBuilder = rowAdapter.createRowBuilder();
RowMerger<RowT> merger = new RowMerger<>(rowBuilder);
RowMerger<RowT> merger = new RowMerger<>(rowBuilder, request.getReversed());
ReframingResponseObserver<ReadRowsResponse, RowT> innerObserver =
new ReframingResponseObserver<>(responseObserver, merger);
inner.call(request, innerObserver, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
*/
final class StateMachine<RowT> {
private final RowBuilder<RowT> adapter;
private boolean reversed;
private State currentState;
private ByteString lastCompleteRowKey;

Expand All @@ -102,9 +103,11 @@ final class StateMachine<RowT> {
* Initialize a new state machine that's ready for a new row.
*
* @param adapter The adapter that will build the final row.
* @param reversed
*/
StateMachine(RowBuilder<RowT> adapter) {
StateMachine(RowBuilder<RowT> adapter, boolean reversed) {
this.adapter = adapter;
this.reversed = reversed;
reset();
}

Expand Down Expand Up @@ -261,9 +264,15 @@ State handleChunk(CellChunk chunk) {
validate(chunk.hasFamilyName(), "AWAITING_NEW_ROW: family missing");
validate(chunk.hasQualifier(), "AWAITING_NEW_ROW: qualifier missing");
if (lastCompleteRowKey != null) {
validate(
ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, chunk.getRowKey()) < 0,
"AWAITING_NEW_ROW: key must be strictly increasing");

int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, chunk.getRowKey());
String direction = "increasing";
if (reversed) {
cmp *= -1;
direction = "decreasing";
}

validate(cmp < 0, "AWAITING_NEW_ROW: key must be strictly " + direction);
}

rowKey = chunk.getRowKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.FeatureFlags;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
Expand All @@ -36,8 +37,14 @@
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand Down Expand Up @@ -78,12 +85,24 @@ public class BigtableDataClientFactoryTest {

private final BlockingQueue<Attributes> setUpAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue<Attributes> terminateAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue<Metadata> requestMetadata = new LinkedBlockingDeque<>();

@Before
public void setUp() throws IOException {
service = new FakeBigtableService();
server =
FakeServiceBuilder.create(service)
.intercept(
new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
requestMetadata.add(headers);
return next.startCall(call, headers);
}
})
.addTransportFilter(
new ServerTransportFilter() {
@Override
Expand Down Expand Up @@ -276,6 +295,24 @@ public void testCreateWithRefreshingChannel() throws Exception {
assertThat(terminateAttributes).hasSize(poolSize);
}

@Test
public void testFeatureFlags() throws Exception {
try (BigtableDataClientFactory factory = BigtableDataClientFactory.create(defaultSettings);
BigtableDataClient client = factory.createDefault()) {

requestMetadata.clear();
client.mutateRow(RowMutation.create("some-table", "some-key").deleteRow());
}

Metadata metadata = requestMetadata.take();
String encodedValue =
metadata.get(Metadata.Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER));
FeatureFlags featureFlags =
FeatureFlags.parseFrom(BaseEncoding.base64Url().decode(encodedValue));

assertThat(featureFlags.getReverseScans()).isTrue();
}

@Test
public void testBulkMutationFlowControllerConfigured() throws Exception {
BigtableDataSettings settings =
Expand Down Expand Up @@ -306,6 +343,7 @@ private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
volatile MutateRowRequest lastRequest;
BlockingQueue<ReadRowsRequest> readRowsRequests = new LinkedBlockingDeque<>();
BlockingQueue<PingAndWarmRequest> pingAndWarmRequests = new LinkedBlockingDeque<>();

private ApiFunction<ReadRowsRequest, ReadRowsResponse> readRowsCallback =
new ApiFunction<ReadRowsRequest, ReadRowsResponse>() {
@Override
Expand Down

0 comments on commit 176360f

Please sign in to comment.