Skip to content

Commit

Permalink
fix: fix the connectivity error count caculation (#1401)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf committed Nov 7, 2022
1 parent 74779e3 commit 1f8cfd7
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ResponseParams;
import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -101,65 +98,13 @@ protected void onResponseImpl(ResponseT response) {

@Override
protected void onErrorImpl(Throwable t) {
// server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata),
// so it's not checking trailing metadata here.
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
t.addSuppressed(t);
}

Util.recordMetricsFromMetadata(responseMetadata, tracer, t);
outerObserver.onError(t);
}

@Override
protected void onCompleteImpl() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
// InvalidProtocolBufferException will only throw if something changed on
// the server side. Location info won't be populated as a result. Ignore
// this error and don't bubble it up to user.
}

Util.recordMetricsFromMetadata(responseMetadata, tracer, null);
outerObserver.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -79,56 +76,12 @@ class BigtableTracerUnaryCallback<ResponseT> implements ApiFutureCallback<Respon

@Override
public void onFailure(Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
}
Util.recordMetricsFromMetadata(responseMetadata, tracer, throwable);
}

@Override
public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
}
Util.recordMetricsFromMetadata(responseMetadata, tracer, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
Expand All @@ -25,10 +26,12 @@
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ResponseParams;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.TableName;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -57,7 +60,7 @@ public class Util {
private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");
static final Metadata.Key<byte[]> METADATA_KEY =
static final Metadata.Key<byte[]> LOCATION_METADATA_KEY =
Metadata.Key.of("x-goog-ext-425905942-bin", Metadata.BINARY_BYTE_MARSHALLER);

/** Convert an exception into a value that can be used to create an OpenCensus tag value. */
Expand Down Expand Up @@ -136,16 +139,62 @@ static Map<String, List<String>> createStatsHeaders(ApiCallContext apiCallContex
return headers.build();
}

static Long getGfeLatency(Metadata metadata) {
if (metadata != null && metadata.get(SERVER_TIMING_HEADER_KEY) != null) {
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(serverTiming);
// this should always be true
if (matcher.find()) {
long latency = Long.valueOf(matcher.group("dur"));
return latency;
private static Long getGfeLatency(@Nullable Metadata metadata) {
if (metadata == null) {
return null;
}
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
if (serverTiming == null) {
return null;
}
Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(serverTiming);
// this should always be true
if (matcher.find()) {
long latency = Long.valueOf(matcher.group("dur"));
return latency;
}
return null;
}

private static ResponseParams getResponseParams(@Nullable Metadata metadata) {
if (metadata == null) {
return null;
}
byte[] responseParams = metadata.get(Util.LOCATION_METADATA_KEY);
if (responseParams != null) {
try {
return ResponseParams.parseFrom(responseParams);
} catch (InvalidProtocolBufferException e) {
}
}
return null;
}

static void recordMetricsFromMetadata(
GrpcResponseMetadata responseMetadata, BigtableTracer tracer, Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();

// Get the response params from the metadata. Check both headers and trailers
// because in different environments the metadata could be returned in headers or trailers
@Nullable ResponseParams responseParams = getResponseParams(responseMetadata.getMetadata());
if (responseParams == null) {
responseParams = getResponseParams(responseMetadata.getTrailingMetadata());
}
// Set tracer locations if response params is not null
if (responseParams != null) {
tracer.setLocations(responseParams.getZoneId(), responseParams.getClusterId());
}

// server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata),
// so it's not checking trailing metadata here.
@Nullable Long latency = getGfeLatency(metadata);
// For direct path, we won't see GFE server-timing header. However, if we received the
// location info, we know that there isn't a connectivity issue. Set the latency to
// 0 so gfe missing header won't get incremented.
if (responseParams != null && latency == null) {
latency = 0L;
}
// Record gfe metrics
tracer.recordGfeMetadata(latency, throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void sendHeaders(Metadata headers) {
ResponseParams params =
ResponseParams.newBuilder().setZoneId(ZONE).setClusterId(CLUSTER).build();
byte[] byteArray = params.toByteArray();
headers.put(Util.METADATA_KEY, byteArray);
headers.put(Util.LOCATION_METADATA_KEY, byteArray);

super.sendHeaders(headers);
}
Expand Down

0 comments on commit 1f8cfd7

Please sign in to comment.