Skip to content

Commit

Permalink
fix: batch time series data when exporting client-side metric (#2222)
Browse files Browse the repository at this point in the history
* Batch time series data when exporting client-side metric to fix issue with too many distinct resources.

* Apply cleanups based on the comments.

* Revert export code changes to diagnose integration test failures.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
rkaregar and gcf-owl-bot[bot] committed May 8, 2024
1 parent 26d5437 commit 1f9f169
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 20 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.37.0')
implementation platform('com.google.cloud:libraries-bom:26.38.0')
implementation 'com.google.cloud:google-cloud-bigtable'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.monitoring.v3.CreateTimeSeriesRequest;
import com.google.monitoring.v3.ProjectName;
Expand All @@ -53,6 +54,7 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -85,6 +87,10 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {

private static final String APPLICATION_RESOURCE_PROJECT_ID = "project_id";

// This the quota limit from Cloud Monitoring. More details in
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
private static final int EXPORT_BATCH_SIZE_LIMIT = 200;

private final MetricServiceClient client;

private final String bigtableProjectId;
Expand Down Expand Up @@ -216,19 +222,12 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection<MetricDat
}

ProjectName projectName = ProjectName.of(bigtableProjectId);
CreateTimeSeriesRequest bigtableRequest =
CreateTimeSeriesRequest.newBuilder()
.setName(projectName.toString())
.addAllTimeSeries(bigtableTimeSeries)
.build();

ApiFuture<Empty> future =
this.client.createServiceTimeSeriesCallable().futureCall(bigtableRequest);
ApiFuture<List<Empty>> future = exportTimeSeries(projectName, bigtableTimeSeries);

CompletableResultCode bigtableExportCode = new CompletableResultCode();
ApiFutures.addCallback(
future,
new ApiFutureCallback<Empty>() {
new ApiFutureCallback<List<Empty>>() {
@Override
public void onFailure(Throwable throwable) {
if (bigtableExportFailureLogged.compareAndSet(false, true)) {
Expand All @@ -245,7 +244,7 @@ public void onFailure(Throwable throwable) {
}

@Override
public void onSuccess(Empty empty) {
public void onSuccess(List<Empty> emptyList) {
// When an export succeeded reset the export failure flag to false so if there's a
// transient failure it'll be logged.
bigtableExportFailureLogged.set(false);
Expand Down Expand Up @@ -290,22 +289,17 @@ private CompletableResultCode exportApplicationResourceMetrics(

// Construct the request. The project id will be the project id of the detected monitored
// resource.
ApiFuture<Empty> gceOrGkeFuture;
ApiFuture<List<Empty>> gceOrGkeFuture;
CompletableResultCode exportCode = new CompletableResultCode();
try {
ProjectName projectName =
ProjectName.of(applicationResource.getLabelsOrThrow(APPLICATION_RESOURCE_PROJECT_ID));
CreateTimeSeriesRequest request =
CreateTimeSeriesRequest.newBuilder()
.setName(projectName.toString())
.addAllTimeSeries(timeSeries)
.build();

gceOrGkeFuture = this.client.createServiceTimeSeriesCallable().futureCall(request);
gceOrGkeFuture = exportTimeSeries(projectName, timeSeries);

ApiFutures.addCallback(
gceOrGkeFuture,
new ApiFutureCallback<Empty>() {
new ApiFutureCallback<List<Empty>>() {
@Override
public void onFailure(Throwable throwable) {
if (applicationExportFailureLogged.compareAndSet(false, true)) {
Expand All @@ -322,7 +316,7 @@ public void onFailure(Throwable throwable) {
}

@Override
public void onSuccess(Empty empty) {
public void onSuccess(List<Empty> emptyList) {
// When an export succeeded reset the export failure flag to false so if there's a
// transient failure it'll be logged.
applicationExportFailureLogged.set(false);
Expand All @@ -341,6 +335,23 @@ public void onSuccess(Empty empty) {
return exportCode;
}

private ApiFuture<List<Empty>> exportTimeSeries(
ProjectName projectName, List<TimeSeries> timeSeries) {
List<ApiFuture<Empty>> batchResults = new ArrayList<>();

for (List<TimeSeries> batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) {
CreateTimeSeriesRequest req =
CreateTimeSeriesRequest.newBuilder()
.setName(projectName.toString())
.addAllTimeSeries(batch)
.build();
ApiFuture<Empty> f = this.client.createServiceTimeSeriesCallable().futureCall(req);
batchResults.add(f);
}

return ApiFutures.allAsList(batchResults);
}

@Override
public CompletableResultCode flush() {
if (lastExportCode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -220,6 +222,80 @@ public void testExportingHistogramData() {
assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
}

@Test
public void testExportingSumDataInBatches() {
ArgumentCaptor<CreateTimeSeriesRequest> argumentCaptor =
ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);

UnaryCallable<CreateTimeSeriesRequest, Empty> mockCallable = mock(UnaryCallable.class);
when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
ApiFuture<Empty> future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);

long startEpoch = 10;
long endEpoch = 15;

Collection<MetricData> toExport = new ArrayList<>();
for (int i = 0; i < 250; i++) {
Attributes testAttributes =
Attributes.builder()
.put(BIGTABLE_PROJECT_ID_KEY, projectId)
.put(INSTANCE_ID_KEY, instanceId)
.put(TABLE_ID_KEY, tableId + i)
.put(CLUSTER_ID_KEY, cluster)
.put(ZONE_ID_KEY, zone)
.put(APP_PROFILE_KEY, appProfileId)
.build();
LongPointData longPointData =
ImmutableLongPointData.create(startEpoch, endEpoch, testAttributes, i);

MetricData longData =
ImmutableMetricData.createLongSum(
resource,
scope,
"bigtable.googleapis.com/internal/client/retry_count",
"description",
"1",
ImmutableSumData.create(
true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
toExport.add(longData);
}

exporter.export(toExport);

assertThat(argumentCaptor.getAllValues()).hasSize(2);
CreateTimeSeriesRequest firstRequest = argumentCaptor.getAllValues().get(0);
CreateTimeSeriesRequest secondRequest = argumentCaptor.getAllValues().get(1);

assertThat(firstRequest.getTimeSeriesList()).hasSize(200);
assertThat(secondRequest.getTimeSeriesList()).hasSize(50);

for (int i = 0; i < 250; i++) {
TimeSeries timeSeries;
if (i < 200) {
timeSeries = firstRequest.getTimeSeriesList().get(i);
} else {
timeSeries = secondRequest.getTimeSeriesList().get(i - 200);
}

assertThat(timeSeries.getResource().getLabelsMap())
.containsExactly(
BIGTABLE_PROJECT_ID_KEY.getKey(), projectId,
INSTANCE_ID_KEY.getKey(), instanceId,
TABLE_ID_KEY.getKey(), tableId + i,
CLUSTER_ID_KEY.getKey(), cluster,
ZONE_ID_KEY.getKey(), zone);

assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(2);
assertThat(timeSeries.getMetric().getLabelsMap())
.containsAtLeast(APP_PROFILE_KEY.getKey(), appProfileId, CLIENT_UID_KEY.getKey(), taskId);
assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(i);
assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos())
.isEqualTo(startEpoch);
assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
}
}

@Test
public void testTimeSeriesForMetricWithGceOrGkeResource() {
String gceProjectId = "fake-gce-project";
Expand Down

0 comments on commit 1f9f169

Please sign in to comment.