Skip to content

Commit

Permalink
fix: Change types for Cloud Bigtable Changestream methods (#1639)
Browse files Browse the repository at this point in the history
Use org.threeten.bp.Instant for below fields:
- startTime
- endTime
- commitTimestamp
- estimatedLowWatermark

Use bigtable.common.Status for CloseStream::getStatus()

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
tengzhonger committed Feb 17, 2023
1 parent 612854f commit 908d70f
Show file tree
Hide file tree
Showing 18 changed files with 193 additions and 64 deletions.
37 changes: 36 additions & 1 deletion google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,41 @@
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/Heartbeat</className>
<method>*getEstimatedLowWatermark*</method>
<to>long</to>
<to>org.threeten.bp.Instant</to>
</difference>
<!-- change method return type is ok because CloseStream is InternalApi -->
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/CloseStream</className>
<method>*getStatus*</method>
<to>com.google.cloud.bigtable.common.Status</to>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation</className>
<method>*getCommitTimestamp*</method>
<to>org.threeten.bp.Instant</to>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation</className>
<method>*getEstimatedLowWatermark*</method>
<to>org.threeten.bp.Instant</to>
</difference>
<!-- change method argument type is ok because ChangeStreamRecordAdapter is InternalApi -->
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter$ChangeStreamRecordBuilder</className>
<method>*</method>
<to>*</to>
</difference>
<!-- change method argument type is ok because ReadChangeStreamQuery is InternalApi -->
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/ReadChangeStreamQuery</className>
<method>*</method>
<to>*</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package com.google.cloud.bigtable.common;

import com.google.common.base.Objects;
import java.io.Serializable;

/**
* The `Status` type defines a logical error model. Each `Status` message contains an error code and
* a error message.
*
* <p>This primarily wraps the protobuf {@link com.google.rpc.Status}.
*/
public final class Status {
public final class Status implements Serializable {
private static final long serialVersionUID = -5512896228725308380L;

public enum Code {
OK(com.google.rpc.Code.OK),
CANCELLED(com.google.rpc.Code.CANCELLED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.protobuf.ByteString;
import java.io.Serializable;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;

/**
* A ChangeStreamMutation represents a list of mods(represented by List<{@link Entry}>) targeted at
Expand Down Expand Up @@ -72,7 +73,7 @@ public enum MutationType {
static Builder createUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
long commitTimestamp,
Instant commitTimestamp,
int tieBreaker) {
return builder()
.setRowKey(rowKey)
Expand All @@ -88,7 +89,7 @@ static Builder createUserMutation(
* mutation.
*/
static Builder createGcMutation(
@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker) {
@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker) {
return builder()
.setRowKey(rowKey)
.setType(MutationType.GARBAGE_COLLECTION)
Expand All @@ -110,7 +111,7 @@ static Builder createGcMutation(
public abstract String getSourceClusterId();

/** Get the commit timestamp of the current mutation. */
public abstract long getCommitTimestamp();
public abstract Instant getCommitTimestamp();

/**
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
Expand All @@ -123,7 +124,7 @@ static Builder createGcMutation(
public abstract String getToken();

/** Get the low watermark of the current mutation. */
public abstract long getEstimatedLowWatermark();
public abstract Instant getEstimatedLowWatermark();

/** Get the list of mods of the current mutation. */
@Nonnull
Expand All @@ -144,15 +145,15 @@ abstract static class Builder {

abstract Builder setSourceClusterId(@Nonnull String sourceClusterId);

abstract Builder setCommitTimestamp(long commitTimestamp);
abstract Builder setCommitTimestamp(Instant commitTimestamp);

abstract Builder setTieBreaker(int tieBreaker);

abstract ImmutableList.Builder<Entry> entriesBuilder();

abstract Builder setToken(@Nonnull String token);

abstract Builder setEstimatedLowWatermark(long estimatedLowWatermark);
abstract Builder setEstimatedLowWatermark(Instant estimatedLowWatermark);

Builder setCell(
@Nonnull String familyName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.protobuf.ByteString;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;

/**
* An extension point that allows end users to plug in a custom implementation of logical change
Expand Down Expand Up @@ -115,15 +116,15 @@ interface ChangeStreamRecordBuilder<ChangeStreamRecordT> {
void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
long commitTimestamp,
Instant commitTimestamp,
int tieBreaker);

/**
* Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most
* once. If called, the current change stream record must not include any close stream message
* or heartbeat.
*/
void startGcMutation(@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker);
void startGcMutation(@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker);

/** Called to add a DeleteFamily mod. */
void deleteFamily(@Nonnull String familyName);
Expand Down Expand Up @@ -164,7 +165,7 @@ void deleteCells(

/** Called once per stream record to signal that all mods have been processed (unless reset). */
ChangeStreamRecordT finishChangeStreamMutation(
@Nonnull String token, long estimatedLowWatermark);
@Nonnull String token, Instant estimatedLowWatermark);

/** Called when the current in progress change stream record should be dropped */
void reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.common.Status;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Status;
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nonnull;
Expand All @@ -34,8 +34,9 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608505L;

private static CloseStream create(
Status status, List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(status, changeStreamContinuationTokens);
com.google.rpc.Status status,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.ByteString;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

/**
* Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link
Expand Down Expand Up @@ -102,7 +103,7 @@ public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream clo
public void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
long commitTimestamp,
Instant commitTimestamp,
int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createUserMutation(
Expand All @@ -111,7 +112,8 @@ public void startUserMutation(

/** {@inheritDoc} */
@Override
public void startGcMutation(@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker) {
public void startGcMutation(
@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker);
}
Expand Down Expand Up @@ -156,7 +158,7 @@ public void finishCell() {
/** {@inheritDoc} */
@Override
public ChangeStreamRecord finishChangeStreamMutation(
@Nonnull String token, long estimatedLowWatermark) {
@Nonnull String token, Instant estimatedLowWatermark) {
this.changeStreamMutationBuilder.setToken(token);
this.changeStreamMutationBuilder.setEstimatedLowWatermark(estimatedLowWatermark);
return this.changeStreamMutationBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.protobuf.util.Timestamps;
import java.io.Serializable;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;

/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
Expand All @@ -29,20 +29,22 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;

private static Heartbeat create(
ChangeStreamContinuationToken changeStreamContinuationToken, long estimatedLowWatermark) {
ChangeStreamContinuationToken changeStreamContinuationToken, Instant estimatedLowWatermark) {
return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
return create(
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
Timestamps.toNanos(heartbeat.getEstimatedLowWatermark()));
Instant.ofEpochSecond(
heartbeat.getEstimatedLowWatermark().getSeconds(),
heartbeat.getEstimatedLowWatermark().getNanos()));
}

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken();

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public abstract long getEstimatedLowWatermark();
public abstract Instant getEstimatedLowWatermark();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Timestamps;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

/** A simple wrapper to construct a query for the ReadChangeStream RPC. */
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
Expand Down Expand Up @@ -142,18 +143,26 @@ public ReadChangeStreamQuery streamPartition(ByteStringRange range) {
return streamPartition(rangeBuilder.build());
}

/** Sets the startTime(Nanosecond) to read the change stream. */
public ReadChangeStreamQuery startTime(long value) {
/** Sets the startTime to read the change stream. */
public ReadChangeStreamQuery startTime(Instant value) {
Preconditions.checkState(
!builder.hasContinuationTokens(),
"startTime and continuationTokens can't be specified together");
builder.setStartTime(Timestamps.fromNanos(value));
builder.setStartTime(
Timestamp.newBuilder()
.setSeconds(value.getEpochSecond())
.setNanos(value.getNano())
.build());
return this;
}

/** Sets the endTime(Nanosecond) to read the change stream. */
public ReadChangeStreamQuery endTime(long value) {
builder.setEndTime(Timestamps.fromNanos(value));
/** Sets the endTime to read the change stream. */
public ReadChangeStreamQuery endTime(Instant value) {
builder.setEndTime(
Timestamp.newBuilder()
.setSeconds(value.getEpochSecond())
.setNanos(value.getNano())
.build());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter.ChangeStreamRecordBuilder;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.Preconditions;
import com.google.protobuf.util.Timestamps;
import org.threeten.bp.Instant;

/**
* A state machine to produce change stream records from a stream of {@link
Expand Down Expand Up @@ -338,7 +338,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) {
"AWAITING_NEW_STREAM_RECORD: GC mutation shouldn't have source cluster id.");
builder.startGcMutation(
dataChange.getRowKey(),
Timestamps.toNanos(dataChange.getCommitTimestamp()),
Instant.ofEpochSecond(
dataChange.getCommitTimestamp().getSeconds(),
dataChange.getCommitTimestamp().getNanos()),
dataChange.getTiebreaker());
} else if (dataChange.getType() == Type.USER) {
validate(
Expand All @@ -347,7 +349,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) {
builder.startUserMutation(
dataChange.getRowKey(),
dataChange.getSourceClusterId(),
Timestamps.toNanos(dataChange.getCommitTimestamp()),
Instant.ofEpochSecond(
dataChange.getCommitTimestamp().getSeconds(),
dataChange.getCommitTimestamp().getNanos()),
dataChange.getTiebreaker());
} else {
validate(false, "AWAITING_NEW_STREAM_RECORD: Unexpected type: " + dataChange.getType());
Expand Down Expand Up @@ -591,7 +595,10 @@ private State checkAndFinishMutationIfNeeded(
validate(dataChange.hasEstimatedLowWatermark(), "Last data change missing lowWatermark");
completeChangeStreamRecord =
builder.finishChangeStreamMutation(
dataChange.getToken(), Timestamps.toNanos(dataChange.getEstimatedLowWatermark()));
dataChange.getToken(),
Instant.ofEpochSecond(
dataChange.getEstimatedLowWatermark().getSeconds(),
dataChange.getEstimatedLowWatermark().getNanos()));
return AWAITING_STREAM_RECORD_CONSUME;
}
// Case 2_2): The current DataChange itself is chunked, so wait for the next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import static com.google.common.truth.Truth.assertWithMessage;

import com.google.rpc.Code;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -89,4 +94,23 @@ public void testToProto() {

assertThat(model.toString()).isEqualTo(proto.toString());
}

@Test
public void testSerialization() throws IOException, ClassNotFoundException {
com.google.rpc.Status proto =
com.google.rpc.Status.newBuilder()
.setCode(Code.UNAVAILABLE.getNumber())
.setMessage("some message")
.build();

Status model = Status.fromProto(proto);

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(model);
oos.close();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
Status actual = (Status) ois.readObject();
assertThat(actual).isEqualTo(model);
}
}

0 comments on commit 908d70f

Please sign in to comment.