Skip to content

Commit df77560

Browse files
authoredJun 29, 2022
feat: update tracers to use built in metrics (#1244)
* feat: add built in metrics measure and views * remove status from application latency * Rename methods and add comments * update based on comments * feat: update tracers to use built in metrics * update on comments * make stopwatch thread safe * update comments * calculate application latency correctly * remove unused check * clean up tests * fix typo * update test * fix flaky test * fix retry count
1 parent a96d3e8 commit df77560

15 files changed

+877
-86
lines changed
 

‎google-cloud-bigtable/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@
6161
<!-- NOTE: Dependencies are organized into two groups, production and test.
6262
Within a group, dependencies are sorted by (groupId, artifactId) -->
6363

64+
<dependency>
65+
<groupId>com.google.cloud</groupId>
66+
<artifactId>google-cloud-bigtable-stats</artifactId>
67+
</dependency>
6468
<!-- Production dependencies -->
6569
<dependency>
6670
<groupId>com.google.api</groupId>

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
7171
import com.google.cloud.bigtable.data.v2.models.RowMutation;
7272
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
73+
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
7374
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
7475
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
7576
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
@@ -194,6 +195,12 @@ public static EnhancedBigtableStubSettings finalizeSettings(
194195
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
195196
TagValue.create(settings.getAppProfileId()))
196197
.build();
198+
ImmutableMap<String, String> builtinAttributes =
199+
ImmutableMap.<String, String>builder()
200+
.put("project_id", settings.getProjectId())
201+
.put("instance_id", settings.getInstanceId())
202+
.put("app_profile", settings.getAppProfileId())
203+
.build();
197204
// Inject Opencensus instrumentation
198205
builder.setTracerFactory(
199206
new CompositeTracerFactory(
@@ -218,6 +225,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
218225
.build()),
219226
// Add OpenCensus Metrics
220227
MetricsTracerFactory.create(tagger, stats, attributes),
228+
BuiltinMetricsTracerFactory.create(builtinAttributes),
221229
// Add user configured tracer
222230
settings.getTracerFactory())));
223231
return builder.build();
@@ -466,7 +474,7 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
466474
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());
467475

468476
UnaryCallable<Query, List<RowT>> withHeaderTracer =
469-
new HeaderTracerUnaryCallable(tracedBatcher);
477+
new HeaderTracerUnaryCallable<>(tracedBatcher);
470478

471479
UnaryCallable<Query, List<RowT>> traced =
472480
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
@@ -594,11 +602,11 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
594602

595603
SpanName spanName = getSpanName("MutateRows");
596604

597-
UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);
605+
UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
606+
new TracedBatcherUnaryCallable<>(userFacing);
598607

599608
UnaryCallable<BulkMutation, Void> withHeaderTracer =
600-
new HeaderTracerUnaryCallable<>(tracedBatcher);
601-
609+
new HeaderTracerUnaryCallable<>(tracedBatcherUnaryCallable);
602610
UnaryCallable<BulkMutation, Void> traced =
603611
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);
604612

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
2626
* implementation that does nothing.
2727
*/
28-
@BetaApi("This surface is stable yet it might be removed in the future.")
28+
@BetaApi("This surface is not stable yet it might be removed in the future.")
2929
public class BigtableTracer extends BaseApiTracer {
3030

3131
private volatile int attempt = 0;
@@ -35,6 +35,23 @@ public void attemptStarted(int attemptNumber) {
3535
this.attempt = attemptNumber;
3636
}
3737

38+
/** annotate when onRequest is called. This will be called in BuiltinMetricsTracer. */
39+
public void onRequest(int requestCount) {
40+
// noop
41+
}
42+
43+
/**
44+
* annotate when automatic flow control is disabled. This will be called in BuiltinMetricsTracer.
45+
*/
46+
public void disableFlowControl() {
47+
// noop
48+
}
49+
50+
/** annotate after the callback from onResponse. This will be called in BuiltinMetricsTracer. */
51+
public void afterResponse(long applicationLatency) {
52+
// noop
53+
}
54+
3855
/**
3956
* Get the attempt number of the current call. Attempt number for the current call is passed in
4057
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
@@ -57,4 +74,12 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
5774
public void batchRequestThrottled(long throttledTimeMs) {
5875
// noop
5976
}
77+
78+
/**
79+
* Set the Bigtable zone and cluster so metrics can be tagged with location information. This will
80+
* be called in BuiltinMetricsTracer.
81+
*/
82+
public void setLocations(String zone, String cluster) {
83+
// noop
84+
}
6085
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.metrics;
17+
18+
import static com.google.api.gax.tracing.ApiTracerFactory.OperationType;
19+
20+
import com.google.api.gax.tracing.SpanName;
21+
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
22+
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.base.Stopwatch;
24+
import com.google.common.math.IntMath;
25+
import java.util.concurrent.CancellationException;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
import javax.annotation.Nullable;
31+
import org.threeten.bp.Duration;
32+
33+
/**
34+
* A {@link BigtableTracer} that records built-in metrics and publish under the
35+
* bigtable.googleapis.com/client namespace
36+
*/
37+
class BuiltinMetricsTracer extends BigtableTracer {
38+
39+
private final StatsRecorderWrapper recorder;
40+
41+
private final OperationType operationType;
42+
private final SpanName spanName;
43+
44+
// Operation level metrics
45+
private final AtomicBoolean opFinished = new AtomicBoolean();
46+
private final Stopwatch operationTimer = Stopwatch.createStarted();
47+
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
48+
49+
// Attempt level metrics
50+
private int attemptCount = 0;
51+
private Stopwatch attemptTimer;
52+
private volatile int attempt = 0;
53+
54+
// Total server latency needs to be atomic because it's accessed from different threads. E.g.
55+
// request() from user thread and attempt failed from grpc thread. We're only measuring the extra
56+
// time application spent blocking grpc buffer, which will be operationLatency - serverLatency.
57+
private final AtomicLong totalServerLatency = new AtomicLong(0);
58+
// Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is
59+
// flushed to memory.
60+
private final Stopwatch serverLatencyTimer = Stopwatch.createUnstarted();
61+
private final AtomicBoolean serverLatencyTimerIsRunning = new AtomicBoolean();
62+
63+
private boolean flowControlIsDisabled = false;
64+
65+
private AtomicInteger requestLeft = new AtomicInteger(0);
66+
67+
// Monitored resource labels
68+
private String tableId = "undefined";
69+
private String zone = "undefined";
70+
private String cluster = "undefined";
71+
72+
// gfe stats
73+
private AtomicLong gfeMissingHeaders = new AtomicLong(0);
74+
75+
@VisibleForTesting
76+
BuiltinMetricsTracer(
77+
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
78+
this.operationType = operationType;
79+
this.spanName = spanName;
80+
this.recorder = recorder;
81+
}
82+
83+
@Override
84+
public Scope inScope() {
85+
return new Scope() {
86+
@Override
87+
public void close() {}
88+
};
89+
}
90+
91+
@Override
92+
public void operationSucceeded() {
93+
recordOperationCompletion(null);
94+
}
95+
96+
@Override
97+
public void operationCancelled() {
98+
recordOperationCompletion(new CancellationException());
99+
}
100+
101+
@Override
102+
public void operationFailed(Throwable error) {
103+
recordOperationCompletion(error);
104+
}
105+
106+
@Override
107+
public void attemptStarted(int attemptNumber) {
108+
attemptStarted(null, attemptNumber);
109+
}
110+
111+
@Override
112+
public void attemptStarted(Object request, int attemptNumber) {
113+
this.attempt = attemptNumber;
114+
attemptCount++;
115+
attemptTimer = Stopwatch.createStarted();
116+
if (request != null) {
117+
this.tableId = Util.extractTableId(request);
118+
}
119+
if (!flowControlIsDisabled) {
120+
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
121+
serverLatencyTimer.start();
122+
}
123+
}
124+
}
125+
126+
@Override
127+
public void attemptSucceeded() {
128+
recordAttemptCompletion(null);
129+
}
130+
131+
@Override
132+
public void attemptCancelled() {
133+
recordAttemptCompletion(new CancellationException());
134+
}
135+
136+
@Override
137+
public void attemptFailed(Throwable error, Duration delay) {
138+
recordAttemptCompletion(error);
139+
}
140+
141+
@Override
142+
public void onRequest(int requestCount) {
143+
requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd);
144+
if (flowControlIsDisabled) {
145+
// On request is only called when auto flow control is disabled. When auto flow control is
146+
// disabled, server latency is measured between onRequest and onResponse.
147+
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
148+
serverLatencyTimer.start();
149+
}
150+
}
151+
}
152+
153+
@Override
154+
public void responseReceived() {
155+
// When auto flow control is enabled, server latency is measured between afterResponse and
156+
// responseReceived.
157+
// When auto flow control is disabled, server latency is measured between onRequest and
158+
// responseReceived.
159+
// When auto flow control is disabled and application requested multiple responses, server
160+
// latency is measured between afterResponse and responseReceived.
161+
// In all the cases, we want to stop the serverLatencyTimer here.
162+
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
163+
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
164+
serverLatencyTimer.reset();
165+
}
166+
}
167+
168+
@Override
169+
public void afterResponse(long applicationLatency) {
170+
if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) {
171+
// When auto flow control is enabled, request will never be called, so server latency is
172+
// measured between after the last response is processed and before the next response is
173+
// received. If flow control is disabled but requestLeft is greater than 0,
174+
// also start the timer to count the time between afterResponse and responseReceived.
175+
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
176+
serverLatencyTimer.start();
177+
}
178+
}
179+
}
180+
181+
@Override
182+
public int getAttempt() {
183+
return attempt;
184+
}
185+
186+
@Override
187+
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
188+
// Record the metrics and put in the map after the attempt is done, so we can have cluster and
189+
// zone information
190+
if (latency != null) {
191+
recorder.putGfeLatencies(latency);
192+
} else {
193+
gfeMissingHeaders.incrementAndGet();
194+
}
195+
recorder.putGfeMissingHeaders(gfeMissingHeaders.get());
196+
}
197+
198+
@Override
199+
public void setLocations(String zone, String cluster) {
200+
this.zone = zone;
201+
this.cluster = cluster;
202+
}
203+
204+
@Override
205+
public void batchRequestThrottled(long throttledTimeMs) {
206+
recorder.putBatchRequestThrottled(throttledTimeMs);
207+
}
208+
209+
@Override
210+
public void disableFlowControl() {
211+
flowControlIsDisabled = true;
212+
}
213+
214+
private void recordOperationCompletion(@Nullable Throwable status) {
215+
if (!opFinished.compareAndSet(false, true)) {
216+
return;
217+
}
218+
operationTimer.stop();
219+
long operationLatency = operationTimer.elapsed(TimeUnit.MILLISECONDS);
220+
221+
recorder.putRetryCount(attemptCount - 1);
222+
223+
// serverLatencyTimer should already be stopped in recordAttemptCompletion
224+
recorder.putOperationLatencies(operationLatency);
225+
recorder.putApplicationLatencies(operationLatency - totalServerLatency.get());
226+
227+
if (operationType == OperationType.ServerStreaming
228+
&& spanName.getMethodName().equals("ReadRows")) {
229+
recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
230+
}
231+
232+
recorder.record(Util.extractStatus(status), tableId, zone, cluster);
233+
}
234+
235+
private void recordAttemptCompletion(@Nullable Throwable status) {
236+
// If the attempt failed, the time spent in retry should be counted in application latency.
237+
// Stop the stopwatch and decrement requestLeft.
238+
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
239+
requestLeft.decrementAndGet();
240+
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
241+
serverLatencyTimer.reset();
242+
}
243+
recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
244+
recorder.record(Util.extractStatus(status), tableId, zone, cluster);
245+
}
246+
}

0 commit comments

Comments
 (0)