Skip to content

Commit bf3e7dd

Browse files
authoredApr 19, 2023
feat: track the latency a request is queued on the grpc channel (#1604)
For all operations, it tracks the latency of the request getting queued on a grpc channel. For batch operations, it's the aggregated value of grpc channel queued latency and batcher flow control latencies.
1 parent 9ca7b08 commit bf3e7dd

File tree

14 files changed

+287
-17
lines changed

14 files changed

+287
-17
lines changed
 

‎google-cloud-bigtable-stats/clirr-ignored-differences.xml

+6
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,10 @@
1313
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
1414
<method>void record(java.lang.String, java.lang.String, java.lang.String, java.lang.String)</method>
1515
</difference>
16+
<!-- Internal API is updated -->
17+
<difference>
18+
<differenceType>7002</differenceType>
19+
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
20+
<method>void putBatchRequestThrottled(long)</method>
21+
</difference>
1622
</differences>

‎google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ public void putGfeMissingHeaders(long connectivityErrors) {
115115
attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
116116
}
117117

118-
public void putBatchRequestThrottled(long throttledTimeMs) {
119-
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
118+
public void putClientBlockingLatencies(long clientBlockingLatency) {
119+
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, clientBlockingLatency);
120120
}
121121

122122
private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) {

‎google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void testStreamingOperation() throws InterruptedException {
9191
recorderWrapper.putGfeLatencies(serverLatency);
9292
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
9393
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
94-
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
94+
recorderWrapper.putClientBlockingLatencies(throttlingLatency);
9595

9696
recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER);
9797
recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER);
@@ -290,7 +290,7 @@ public void testUnaryOperations() throws InterruptedException {
290290
recorderWrapper.putGfeLatencies(serverLatency);
291291
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
292292
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
293-
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
293+
recorderWrapper.putClientBlockingLatencies(throttlingLatency);
294294

295295
recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
296296
recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable;
8888
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy;
8989
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable;
90+
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerBatchedUnaryCallable;
9091
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
9192
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
9293
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
@@ -509,7 +510,7 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
509510
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());
510511

511512
UnaryCallable<Query, List<RowT>> withBigtableTracer =
512-
new BigtableTracerUnaryCallable<>(tracedBatcher);
513+
new BigtableTracerBatchedUnaryCallable<>(tracedBatcher);
513514

514515
UnaryCallable<Query, List<RowT>> traced =
515516
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span);
@@ -641,7 +642,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
641642
new TracedBatcherUnaryCallable<>(userFacing);
642643

643644
UnaryCallable<BulkMutation, Void> withBigtableTracer =
644-
new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable);
645+
new BigtableTracerBatchedUnaryCallable<>(tracedBatcherUnaryCallable);
645646
UnaryCallable<BulkMutation, Void> traced =
646647
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName);
647648

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.metrics;
17+
18+
import com.google.common.base.Stopwatch;
19+
import io.grpc.Attributes;
20+
import io.grpc.ClientStreamTracer;
21+
import io.grpc.Metadata;
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
26+
* tracing and Bigtable tracing. Its primary purpose is to measure the transition time between
27+
* asking gRPC to start an RPC and gRPC actually serializing that RPC.
28+
*/
29+
class BigtableGrpcStreamTracer extends ClientStreamTracer {
30+
31+
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
32+
private final BigtableTracer tracer;
33+
34+
public BigtableGrpcStreamTracer(BigtableTracer tracer) {
35+
this.tracer = tracer;
36+
}
37+
38+
@Override
39+
public void streamCreated(Attributes transportAttrs, Metadata headers) {
40+
stopwatch.start();
41+
}
42+
43+
@Override
44+
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
45+
tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.MILLISECONDS));
46+
}
47+
48+
static class Factory extends ClientStreamTracer.Factory {
49+
50+
private final BigtableTracer tracer;
51+
52+
Factory(BigtableTracer tracer) {
53+
this.tracer = tracer;
54+
}
55+
56+
@Override
57+
public ClientStreamTracer newClientStreamTracer(
58+
ClientStreamTracer.StreamInfo info, Metadata headers) {
59+
return new BigtableGrpcStreamTracer(tracer);
60+
}
61+
}
62+
}

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java

+4
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,8 @@ public void batchRequestThrottled(long throttledTimeMs) {
8282
public void setLocations(String zone, String cluster) {
8383
// noop
8484
}
85+
86+
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
87+
// noop
88+
}
8589
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.metrics;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.core.ApiFutures;
20+
import com.google.api.core.InternalApi;
21+
import com.google.api.gax.grpc.GrpcResponseMetadata;
22+
import com.google.api.gax.rpc.ApiCallContext;
23+
import com.google.api.gax.rpc.UnaryCallable;
24+
import com.google.common.util.concurrent.MoreExecutors;
25+
import javax.annotation.Nonnull;
26+
27+
/**
28+
* This callable will do everything described in {@link BigtableTracerUnaryCallable} except that it
29+
* won't inject a {@link BigtableGrpcStreamTracer}. For batching calls, we only want to calculate
30+
* the total time client is blocked because of flow control.
31+
*/
32+
@InternalApi
33+
public class BigtableTracerBatchedUnaryCallable<RequestT, ResponseT>
34+
extends BigtableTracerUnaryCallable<RequestT, ResponseT> {
35+
36+
private UnaryCallable<RequestT, ResponseT> innerCallable;
37+
38+
public BigtableTracerBatchedUnaryCallable(
39+
@Nonnull UnaryCallable<RequestT, ResponseT> innerCallable) {
40+
super(innerCallable);
41+
this.innerCallable = innerCallable;
42+
}
43+
44+
@Override
45+
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
46+
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
47+
BigtableTracerUnaryCallback<ResponseT> callback =
48+
new BigtableTracerUnaryCallback<ResponseT>(
49+
(BigtableTracer) context.getTracer(), responseMetadata);
50+
ApiFuture<ResponseT> future =
51+
innerCallable.futureCall(request, responseMetadata.addHandlers(context));
52+
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
53+
return future;
54+
}
55+
}

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
3838
* cluster ids.
3939
* <li>-Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream.
40+
* <li>-This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an
41+
* RPC spent in a grpc channel queue.
4042
* <li>This class is considered an internal implementation detail and not meant to be used by
4143
* applications.
4244
*/
@@ -60,7 +62,11 @@ public void call(
6062
BigtableTracerResponseObserver<ResponseT> innerObserver =
6163
new BigtableTracerResponseObserver<>(
6264
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
63-
innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context));
65+
innerCallable.call(
66+
request,
67+
innerObserver,
68+
Util.injectBigtableStreamTracer(
69+
context, responseMetadata, (BigtableTracer) context.getTracer()));
6470
} else {
6571
innerCallable.call(request, responseObserver, context);
6672
}

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
* the gfe_header_missing_counter in this case.
3636
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
3737
* cluster ids.
38+
* <li>-This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an
39+
* RPC spent in a grpc channel queue.
3840
* <li>This class is considered an internal implementation detail and not meant to be used by
3941
* applications.
4042
*/
@@ -49,14 +51,18 @@ public BigtableTracerUnaryCallable(@Nonnull UnaryCallable<RequestT, ResponseT> i
4951
}
5052

5153
@Override
52-
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
54+
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
5355
// tracer should always be an instance of BigtableTracer
5456
if (context.getTracer() instanceof BigtableTracer) {
5557
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
56-
final ApiCallContext contextWithResponseMetadata = responseMetadata.addHandlers(context);
57-
BigtableTracerUnaryCallback callback =
58-
new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata);
59-
ApiFuture<ResponseT> future = innerCallable.futureCall(request, contextWithResponseMetadata);
58+
BigtableTracerUnaryCallback<ResponseT> callback =
59+
new BigtableTracerUnaryCallback<ResponseT>(
60+
(BigtableTracer) context.getTracer(), responseMetadata);
61+
ApiFuture<ResponseT> future =
62+
innerCallable.futureCall(
63+
request,
64+
Util.injectBigtableStreamTracer(
65+
context, responseMetadata, (BigtableTracer) context.getTracer()));
6066
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
6167
return future;
6268
} else {

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class BuiltinMetricsTracer extends BigtableTracer {
7171
private String zone = "global";
7272
private String cluster = "unspecified";
7373

74+
private AtomicLong totalClientBlockingTime = new AtomicLong(0);
75+
7476
@VisibleForTesting
7577
BuiltinMetricsTracer(
7678
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
@@ -219,7 +221,12 @@ public void setLocations(String zone, String cluster) {
219221

220222
@Override
221223
public void batchRequestThrottled(long throttledTimeMs) {
222-
recorder.putBatchRequestThrottled(throttledTimeMs);
224+
totalClientBlockingTime.addAndGet(throttledTimeMs);
225+
}
226+
227+
@Override
228+
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
229+
totalClientBlockingTime.addAndGet(queuedTimeMs);
223230
}
224231

225232
@Override
@@ -266,6 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
266273
}
267274
}
268275

276+
recorder.putClientBlockingLatencies(totalClientBlockingTime.get());
277+
269278
// Patch the status until it's fixed in gax. When an attempt failed,
270279
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
271280
// so it could get processed by extractStatus

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java

+7
Original file line numberDiff line numberDiff line change
@@ -218,4 +218,11 @@ public void afterResponse(long applicationLatency) {
218218
tracer.afterResponse(applicationLatency);
219219
}
220220
}
221+
222+
@Override
223+
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
224+
for (BigtableTracer tracer : bigtableTracers) {
225+
tracer.grpcChannelQueuedLatencies(queuedTimeMs);
226+
}
227+
}
221228
}

‎google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java

+21
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.bigtable.data.v2.stub.metrics;
1717

1818
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.grpc.GrpcCallContext;
1920
import com.google.api.gax.grpc.GrpcResponseMetadata;
2021
import com.google.api.gax.rpc.ApiCallContext;
2122
import com.google.api.gax.rpc.ApiException;
@@ -32,6 +33,7 @@
3233
import com.google.common.base.Strings;
3334
import com.google.common.collect.ImmutableMap;
3435
import com.google.protobuf.InvalidProtocolBufferException;
36+
import io.grpc.CallOptions;
3537
import io.grpc.Metadata;
3638
import io.grpc.Status;
3739
import io.grpc.StatusException;
@@ -197,4 +199,23 @@ static void recordMetricsFromMetadata(
197199
// Record gfe metrics
198200
tracer.recordGfeMetadata(latency, throwable);
199201
}
202+
203+
/**
204+
* This method bridges gRPC stream tracing to bigtable tracing by adding a {@link
205+
* io.grpc.ClientStreamTracer} to the callContext.
206+
*/
207+
static GrpcCallContext injectBigtableStreamTracer(
208+
ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) {
209+
if (context instanceof GrpcCallContext) {
210+
GrpcCallContext callContext = (GrpcCallContext) context;
211+
CallOptions callOptions = callContext.getCallOptions();
212+
return responseMetadata.addHandlers(
213+
callContext.withCallOptions(
214+
callOptions.withStreamTracerFactory(new BigtableGrpcStreamTracer.Factory(tracer))));
215+
} else {
216+
// context should always be an instance of GrpcCallContext. If not throw an exception
217+
// so we can see what class context is.
218+
throw new RuntimeException("Unexpected context class: " + context.getClass().getName());
219+
}
220+
}
200221
}

0 commit comments

Comments
 (0)