-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
feat: Add the MetricsCollector for client side metrics #1566
Changes from all commits
abb292b
051b488
1c49f86
7a5be3b
5390411
a7f2fd4
9e3e5f5
9f93172
b32aea0
750c1e7
b885126
5fb300b
feb36e7
8465b3a
7a97aab
51d3dd3
ee8c272
b7413e8
7c8877b
503a2a9
938cb2c
854e1d1
db7d1b1
ee67037
ea2fbe2
c22eb5b
a658a39
960b402
23a7c14
0ac6d15
bad23b2
49bd7ca
129e8fd
052c7bb
ac27a95
18c942e
7a3aabc
5906c29
be731af
c26640f
3011e50
945f237
b04c3c4
2417e80
df59d88
8ad51f5
6868f5a
7cc36a2
62a4b8b
7c4f414
83f53ae
610eec0
a2b5951
5f67cad
8f20c78
9b1ba9d
88e96c3
ed39628
76b1249
8d60cb1
19fef92
1ecfb1c
d8a3960
1d6b645
acb1d3a
c30b057
9ef079b
d5a0368
ae532d8
b2bced9
e1dd61c
cd97f35
9ec98df
2457dbe
5a1a3aa
1bd2d2b
c2be338
cd0d774
e7caf36
7fd86d2
db05ff3
9cc4b15
0142329
15d6e4a
865529e
28fbfd8
5995789
a62b124
bfc5883
0996d3c
ef8e3fe
c68a76f
47a24b1
b2600f2
d4d3f6c
b8dff1c
d50384f
b5fc1f2
1e5dc82
c2ffbc6
1200b3c
9320149
4023361
ef91733
52b570c
6a6774f
10b6d30
33c17c6
fbf2314
39fe861
8f13100
48e0e95
66c4ab1
e7c5b5f
98be351
efdfcea
4a6a476
edfcf8a
bc4998f
7e6374d
10b72ec
a72b51a
47fd9d0
2a42162
9073f07
32d3983
9716c4a
d2b93ee
99f9577
c82e72d
759e829
76b6f5a
1e840a4
7bf62e9
fca55b7
51afdce
57b1dc1
0f850b7
6c1e01b
2910408
2781561
0b4d93e
1b6681b
b6f1302
1ae82ff
3ee5604
124ed30
ef36a6f
6829224
dd603f1
b493c0d
2f19f31
dfe7d57
2d1684f
38b6f84
96d0da5
0b28837
67bec83
451c39f
1a12e82
064712f
197ea59
ffbb1d0
b4db210
21992eb
598a57d
ee27619
fa0ff6f
6d744b8
707a9e9
0cbdbc1
4ba6ba0
59f0358
d15aee6
5da73e1
1aa6294
4af9d78
d352b39
038cf5c
f2f9ecf
e9c7564
8c0406a
801d935
6565298
8a62bd0
dbb74f1
6ea3b0c
1824f87
6add7fd
cab137c
3146854
667a705
c2d27e6
12ffbe0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// Copyright 2025 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// The backend is expecting true/false and will fail if other values are provided. | ||
// export in open telemetry is expecting string value attributes so we don't use boolean | ||
// true/false. | ||
export enum StreamingState { | ||
STREAMING = 'true', | ||
UNARY = 'false', | ||
} | ||
|
||
/** | ||
* Represents the names of Bigtable methods. These are used as attributes for | ||
* metrics, allowing for differentiation of performance by method. | ||
*/ | ||
export enum MethodName { | ||
READ_ROWS = 'Bigtable.ReadRows', | ||
MUTATE_ROW = 'Bigtable.MutateRow', | ||
CHECK_AND_MUTATE_ROW = 'Bigtable.CheckAndMutateRow', | ||
READ_MODIFY_WRITE_ROW = 'Bigtable.ReadModifyWriteRow', | ||
SAMPLE_ROW_KEYS = 'Bigtable.SampleRowKeys', | ||
MUTATE_ROWS = 'Bigtable.MutateRows', | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
// Copyright 2025 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import {MethodName, StreamingState} from './client-side-metrics-attributes'; | ||
import {grpc} from 'google-gax'; | ||
|
||
/** | ||
* The interfaces below use undefined instead of null to indicate a metric is | ||
* not available yet. The benefit of this is that new metrics can be added | ||
* without requiring users to change the methods in their metrics handler. | ||
*/ | ||
|
||
type IMetricsCollectorData = { | ||
instanceId: string; | ||
table: string; | ||
cluster?: string; | ||
zone?: string; | ||
appProfileId?: string; | ||
methodName: MethodName; | ||
clientUid: string; | ||
}; | ||
|
||
interface StandardData { | ||
projectId: string; | ||
metricsCollectorData: IMetricsCollectorData; | ||
clientName: string; | ||
streamingOperation: StreamingState; | ||
} | ||
|
||
export interface OnOperationCompleteData extends StandardData { | ||
firstResponseLatency?: number; | ||
operationLatency: number; | ||
retryCount?: number; | ||
finalOperationStatus: grpc.status; | ||
} | ||
|
||
export interface OnAttemptCompleteData extends StandardData { | ||
attemptLatency: number; | ||
serverLatency?: number; | ||
connectivityErrorCount: number; | ||
attemptStatus: grpc.status; | ||
} | ||
|
||
/** | ||
* An interface for handling client-side metrics related to Bigtable operations. | ||
* Implementations of this interface can define how metrics are recorded and processed. | ||
*/ | ||
export interface IMetricsHandler { | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/** | ||
* Called when an operation completes (successfully or unsuccessfully). | ||
* @param {OnOperationCompleteData} data Metrics and attributes related to the completed operation. | ||
*/ | ||
onOperationComplete?(data: OnOperationCompleteData): void; | ||
|
||
/** | ||
* Called when an attempt (e.g., an RPC attempt) completes. | ||
* @param {OnAttemptCompleteData} data Metrics and attributes related to the completed attempt. | ||
*/ | ||
onAttemptComplete?(data: OnAttemptCompleteData): void; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,300 @@ | ||
// Copyright 2025 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import * as fs from 'fs'; | ||
import {IMetricsHandler} from './metrics-handler'; | ||
import {MethodName, StreamingState} from './client-side-metrics-attributes'; | ||
import {grpc} from 'google-gax'; | ||
import * as gax from 'google-gax'; | ||
const root = gax.protobuf.loadSync( | ||
'./protos/google/bigtable/v2/response_params.proto' | ||
); | ||
const ResponseParams = root.lookupType('ResponseParams'); | ||
|
||
/** | ||
* An interface representing a tabular API surface, such as a Bigtable table. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this exist here? Isn't there already a class for the TabularAPISurface? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An |
||
*/ | ||
export interface ITabularApiSurface { | ||
instance: { | ||
id: string; | ||
}; | ||
id: string; | ||
bigtable: { | ||
appProfileId?: string; | ||
clientUid: string; | ||
}; | ||
} | ||
|
||
const packageJSON = fs.readFileSync('package.json'); | ||
const version = JSON.parse(packageJSON.toString()).version; | ||
|
||
// MetricsCollectorState is a list of states that the metrics collector can be in. | ||
// Tracking the OperationMetricsCollector state is done so that the | ||
// OperationMetricsCollector methods are not called in the wrong order. If the | ||
// methods are called in the wrong order they will not execute and they will | ||
// throw warnings. | ||
// | ||
// The following state transitions are allowed: | ||
// OPERATION_NOT_STARTED -> OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS | ||
// OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS -> OPERATION_STARTED_ATTEMPT_IN_PROGRESS | ||
// OPERATION_STARTED_ATTEMPT_IN_PROGRESS -> OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS | ||
// OPERATION_STARTED_ATTEMPT_IN_PROGRESS -> OPERATION_COMPLETE | ||
enum MetricsCollectorState { | ||
OPERATION_NOT_STARTED, | ||
OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS, | ||
OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET, | ||
OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED, | ||
OPERATION_COMPLETE, | ||
} | ||
|
||
/** | ||
* A class for tracing and recording client-side metrics related to Bigtable operations. | ||
*/ | ||
export class OperationMetricsCollector { | ||
private state: MetricsCollectorState; | ||
private operationStartTime: Date | null; | ||
private attemptStartTime: Date | null; | ||
private zone: string | undefined; | ||
private cluster: string | undefined; | ||
private tabularApiSurface: ITabularApiSurface; | ||
private methodName: MethodName; | ||
private attemptCount = 0; | ||
private metricsHandlers: IMetricsHandler[]; | ||
private firstResponseLatency: number | null; | ||
private serverTimeRead: boolean; | ||
private serverTime: number | null; | ||
private connectivityErrorCount: number; | ||
private streamingOperation: StreamingState; | ||
|
||
/** | ||
* @param {ITabularApiSurface} tabularApiSurface Information about the Bigtable table being accessed. | ||
* @param {IMetricsHandler[]} metricsHandlers The metrics handlers used for recording metrics. | ||
* @param {MethodName} methodName The name of the method being traced. | ||
* @param {StreamingState} streamingOperation Whether or not the call is a streaming operation. | ||
*/ | ||
constructor( | ||
tabularApiSurface: ITabularApiSurface, | ||
metricsHandlers: IMetricsHandler[], | ||
methodName: MethodName, | ||
streamingOperation: StreamingState | ||
) { | ||
this.state = MetricsCollectorState.OPERATION_NOT_STARTED; | ||
this.zone = undefined; | ||
this.cluster = undefined; | ||
this.tabularApiSurface = tabularApiSurface; | ||
this.methodName = methodName; | ||
this.operationStartTime = null; | ||
this.attemptStartTime = null; | ||
this.metricsHandlers = metricsHandlers; | ||
this.firstResponseLatency = null; | ||
this.serverTimeRead = false; | ||
this.serverTime = null; | ||
this.connectivityErrorCount = 0; | ||
this.streamingOperation = streamingOperation; | ||
} | ||
|
||
private getMetricsCollectorData() { | ||
return { | ||
instanceId: this.tabularApiSurface.instance.id, | ||
table: this.tabularApiSurface.id, | ||
cluster: this.cluster, | ||
zone: this.zone, | ||
appProfileId: this.tabularApiSurface.bigtable.appProfileId, | ||
methodName: this.methodName, | ||
clientUid: this.tabularApiSurface.bigtable.clientUid, | ||
}; | ||
} | ||
|
||
/** | ||
* Called when the operation starts. Records the start time. | ||
*/ | ||
onOperationStart() { | ||
if (this.state === MetricsCollectorState.OPERATION_NOT_STARTED) { | ||
this.operationStartTime = new Date(); | ||
this.firstResponseLatency = null; | ||
this.state = | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS; | ||
} else { | ||
console.warn('Invalid state transition'); | ||
} | ||
} | ||
|
||
/** | ||
* Called when an attempt (e.g., an RPC attempt) completes. Records attempt latencies. | ||
* @param {string} projectId The id of the project. | ||
* @param {grpc.status} attemptStatus The grpc status for the attempt. | ||
*/ | ||
onAttemptComplete(projectId: string, attemptStatus: grpc.status) { | ||
if ( | ||
this.state === | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET || | ||
this.state === | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED | ||
) { | ||
this.state = | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS; | ||
this.attemptCount++; | ||
const endTime = new Date(); | ||
if (projectId && this.attemptStartTime) { | ||
const totalTime = endTime.getTime() - this.attemptStartTime.getTime(); | ||
this.metricsHandlers.forEach(metricsHandler => { | ||
if (metricsHandler.onAttemptComplete) { | ||
metricsHandler.onAttemptComplete({ | ||
attemptLatency: totalTime, | ||
serverLatency: this.serverTime ?? undefined, | ||
connectivityErrorCount: this.connectivityErrorCount, | ||
streamingOperation: this.streamingOperation, | ||
attemptStatus, | ||
clientName: `nodejs-bigtable/${version}`, | ||
metricsCollectorData: this.getMetricsCollectorData(), | ||
projectId, | ||
}); | ||
} | ||
}); | ||
} | ||
} else { | ||
console.warn('Invalid state transition attempted'); | ||
} | ||
} | ||
|
||
/** | ||
* Called when a new attempt starts. Records the start time of the attempt. | ||
*/ | ||
onAttemptStart() { | ||
if ( | ||
this.state === | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS | ||
) { | ||
this.state = | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET; | ||
this.attemptStartTime = new Date(); | ||
this.serverTime = null; | ||
this.serverTimeRead = false; | ||
this.connectivityErrorCount = 0; | ||
} else { | ||
console.warn('Invalid state transition attempted'); | ||
} | ||
} | ||
|
||
/** | ||
* Called when the first response is received. Records first response latencies. | ||
*/ | ||
onResponse(projectId: string) { | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if ( | ||
this.state === | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET | ||
) { | ||
this.state = | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED; | ||
const endTime = new Date(); | ||
if (projectId && this.operationStartTime) { | ||
this.firstResponseLatency = | ||
endTime.getTime() - this.operationStartTime.getTime(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Called when an operation completes (successfully or unsuccessfully). | ||
* Records operation latencies, retry counts, and connectivity error counts. | ||
* @param {string} projectId The id of the project. | ||
* @param {grpc.status} finalOperationStatus Information about the completed operation. | ||
*/ | ||
onOperationComplete(projectId: string, finalOperationStatus: grpc.status) { | ||
if ( | ||
this.state === | ||
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS | ||
) { | ||
this.state = MetricsCollectorState.OPERATION_COMPLETE; | ||
const endTime = new Date(); | ||
if (projectId && this.operationStartTime) { | ||
const totalTime = endTime.getTime() - this.operationStartTime.getTime(); | ||
{ | ||
this.metricsHandlers.forEach(metricsHandler => { | ||
if (metricsHandler.onOperationComplete) { | ||
metricsHandler.onOperationComplete({ | ||
finalOperationStatus: finalOperationStatus, | ||
streamingOperation: this.streamingOperation, | ||
metricsCollectorData: this.getMetricsCollectorData(), | ||
clientName: `nodejs-bigtable/${version}`, | ||
projectId, | ||
operationLatency: totalTime, | ||
retryCount: this.attemptCount - 1, | ||
firstResponseLatency: this.firstResponseLatency ?? undefined, | ||
}); | ||
} | ||
}); | ||
} | ||
} | ||
} else { | ||
console.warn('Invalid state transition attempted'); | ||
} | ||
} | ||
|
||
/** | ||
* Called when metadata is received. Extracts server timing information if available. | ||
* @param {object} metadata The received metadata. | ||
*/ | ||
onMetadataReceived(metadata: { | ||
internalRepr: Map<string, string[]>; | ||
options: {}; | ||
}) { | ||
const mappedEntries = new Map( | ||
Array.from(metadata.internalRepr.entries(), ([key, value]) => [ | ||
key, | ||
value.toString(), | ||
]) | ||
); | ||
const SERVER_TIMING_REGEX = /.*gfet4t7;\s*dur=(\d+\.?\d*).*/; | ||
const SERVER_TIMING_KEY = 'server-timing'; | ||
const durationValues = mappedEntries.get(SERVER_TIMING_KEY); | ||
const matchedDuration = durationValues?.match(SERVER_TIMING_REGEX); | ||
if (matchedDuration && matchedDuration[1]) { | ||
if (!this.serverTimeRead) { | ||
this.serverTimeRead = true; | ||
this.serverTime = isNaN(parseInt(matchedDuration[1])) | ||
? null | ||
: parseInt(matchedDuration[1]); | ||
} | ||
} else { | ||
this.connectivityErrorCount++; | ||
} | ||
} | ||
|
||
/** | ||
* Called when status information is received. Extracts zone and cluster information. | ||
* @param {object} status The received status information. | ||
*/ | ||
onStatusMetadataReceived(status: { | ||
metadata: {internalRepr: Map<string, Uint8Array[]>; options: {}}; | ||
}) { | ||
const INSTANCE_INFORMATION_KEY = 'x-goog-ext-425905942-bin'; | ||
const mappedValue = status.metadata.internalRepr.get( | ||
INSTANCE_INFORMATION_KEY | ||
) as Buffer[]; | ||
const decodedValue = ResponseParams.decode( | ||
mappedValue[0], | ||
mappedValue[0].length | ||
); | ||
if (decodedValue && (decodedValue as unknown as {zoneId: string}).zoneId) { | ||
this.zone = (decodedValue as unknown as {zoneId: string}).zoneId; | ||
} | ||
if ( | ||
decodedValue && | ||
(decodedValue as unknown as {clusterId: string}).clusterId | ||
) { | ||
this.cluster = (decodedValue as unknown as {clusterId: string}).clusterId; | ||
} | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
// Copyright 2025 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import { | ||
IMetricsHandler, | ||
OnAttemptCompleteData, | ||
OnOperationCompleteData, | ||
} from '../src/client-side-metrics/metrics-handler'; | ||
|
||
/** | ||
* A test implementation of the IMetricsHandler interface. Used for testing purposes. | ||
* It logs the metrics and attributes received by the onOperationComplete and onAttemptComplete methods. | ||
*/ | ||
export class TestMetricsHandler implements IMetricsHandler { | ||
private messages: {value: string}; | ||
requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = []; | ||
|
||
constructor(messages: {value: string}) { | ||
this.messages = messages; | ||
} | ||
/** | ||
* Logs the metrics and attributes received for an operation completion. | ||
* @param {OnOperationCompleteData} data Metrics related to the completed operation. | ||
*/ | ||
onOperationComplete(data: OnOperationCompleteData) { | ||
this.requestsHandled.push(data); | ||
data.clientName = 'nodejs-bigtable'; | ||
this.messages.value += 'Recording parameters for onOperationComplete:\n'; | ||
this.messages.value += `${JSON.stringify(data)}\n`; | ||
} | ||
|
||
/** | ||
* Logs the metrics and attributes received for an attempt completion. | ||
* @param {OnOperationCompleteData} data Metrics related to the completed attempt. | ||
*/ | ||
onAttemptComplete(data: OnAttemptCompleteData) { | ||
this.requestsHandled.push(data); | ||
data.clientName = 'nodejs-bigtable'; | ||
this.messages.value += 'Recording parameters for onAttemptComplete:\n'; | ||
this.messages.value += `${JSON.stringify(data)}\n`; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
// Copyright 2025 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import {describe} from 'mocha'; | ||
import * as assert from 'assert'; | ||
import * as fs from 'fs'; | ||
import {TestMetricsHandler} from '../../test-common/test-metrics-handler'; | ||
import {OperationMetricsCollector} from '../../src/client-side-metrics/operation-metrics-collector'; | ||
import { | ||
MethodName, | ||
StreamingState, | ||
} from '../../src/client-side-metrics/client-side-metrics-attributes'; | ||
import {grpc} from 'google-gax'; | ||
import {expectedRequestsHandled} from './metrics-handler-fixture'; | ||
import * as gax from 'google-gax'; | ||
const root = gax.protobuf.loadSync( | ||
'./protos/google/bigtable/v2/response_params.proto' | ||
); | ||
const ResponseParams = root.lookupType('ResponseParams'); | ||
|
||
/** | ||
* A fake implementation of the Bigtable client for testing purposes. Provides a | ||
* metricsTracerFactory and a stubbed projectId method. | ||
*/ | ||
class FakeBigtable { | ||
clientUid = 'fake-uuid'; | ||
appProfileId?: string; | ||
projectId = 'my-project'; | ||
} | ||
|
||
/** | ||
* A fake implementation of a Bigtable instance for testing purposes. Provides only an ID. | ||
*/ | ||
class FakeInstance { | ||
/** | ||
* The ID of the fake instance. | ||
*/ | ||
id = 'fakeInstanceId'; | ||
} | ||
|
||
describe('Bigtable/MetricsCollector', () => { | ||
const logger = {value: ''}; | ||
const originalDate = global.Date; | ||
|
||
before(() => { | ||
let mockTime = new Date('1970-01-01T00:00:01.000Z').getTime(); | ||
|
||
(global as any).Date = class extends originalDate { | ||
constructor(...args: any[]) { | ||
// Using a rest parameter | ||
if (args.length === 0) { | ||
super(mockTime); | ||
logger.value += `getDate call returns ${mockTime.toString()} ms\n`; | ||
mockTime += 1000; | ||
} | ||
} | ||
|
||
static now(): number { | ||
return mockTime; | ||
} | ||
|
||
static parse(dateString: string): number { | ||
return originalDate.parse(dateString); | ||
} | ||
|
||
static UTC( | ||
year: number, | ||
month: number, | ||
date?: number, | ||
hours?: number, | ||
minutes?: number, | ||
seconds?: number, | ||
ms?: number | ||
): number { | ||
return originalDate.UTC(year, month, date, hours, minutes, seconds, ms); | ||
} | ||
}; | ||
}); | ||
|
||
after(() => { | ||
(global as any).Date = originalDate; | ||
}); | ||
|
||
it('should record the right metrics with a typical method call', async () => { | ||
const testHandler = new TestMetricsHandler(logger); | ||
const metricsHandlers = [testHandler]; | ||
class FakeTable { | ||
id = 'fakeTableId'; | ||
instance = new FakeInstance(); | ||
bigtable = new FakeBigtable(); | ||
|
||
async fakeMethod(): Promise<void> { | ||
function createMetadata(duration: string) { | ||
return { | ||
internalRepr: new Map([ | ||
['server-timing', [`gfet4t7; dur=${duration}`]], | ||
]), | ||
options: {}, | ||
}; | ||
} | ||
if (this.bigtable.projectId) { | ||
const status = { | ||
metadata: { | ||
internalRepr: new Map([ | ||
[ | ||
'x-goog-ext-425905942-bin', | ||
[ | ||
ResponseParams.encode({ | ||
zoneId: 'us-west1-c', | ||
clusterId: 'fake-cluster3', | ||
}).finish(), | ||
], | ||
], | ||
]), | ||
options: {}, | ||
}, | ||
}; | ||
const metricsCollector = new OperationMetricsCollector( | ||
this, | ||
metricsHandlers, | ||
MethodName.READ_ROWS, | ||
StreamingState.STREAMING | ||
); | ||
// In this method we simulate a series of events that might happen | ||
// when a user calls one of the Table methods. | ||
// Here is an example of what might happen in a method call: | ||
logger.value += '1. The operation starts\n'; | ||
metricsCollector.onOperationStart(); | ||
logger.value += '2. The attempt starts.\n'; | ||
metricsCollector.onAttemptStart(); | ||
logger.value += '3. Client receives status information.\n'; | ||
metricsCollector.onStatusMetadataReceived(status); | ||
logger.value += '4. Client receives metadata.\n'; | ||
metricsCollector.onMetadataReceived(createMetadata('101')); | ||
logger.value += '5. Client receives first row.\n'; | ||
metricsCollector.onResponse(this.bigtable.projectId); | ||
logger.value += '6. Client receives metadata.\n'; | ||
metricsCollector.onMetadataReceived(createMetadata('102')); | ||
logger.value += '7. Client receives second row.\n'; | ||
metricsCollector.onResponse(this.bigtable.projectId); | ||
logger.value += '8. A transient error occurs.\n'; | ||
metricsCollector.onAttemptComplete( | ||
this.bigtable.projectId, | ||
grpc.status.DEADLINE_EXCEEDED | ||
); | ||
logger.value += '9. After a timeout, the second attempt is made.\n'; | ||
metricsCollector.onAttemptStart(); | ||
logger.value += '10. Client receives status information.\n'; | ||
metricsCollector.onStatusMetadataReceived(status); | ||
logger.value += '11. Client receives metadata.\n'; | ||
metricsCollector.onMetadataReceived(createMetadata('103')); | ||
logger.value += '12. Client receives third row.\n'; | ||
metricsCollector.onResponse(this.bigtable.projectId); | ||
logger.value += '13. Client receives metadata.\n'; | ||
metricsCollector.onMetadataReceived(createMetadata('104')); | ||
logger.value += '14. Client receives fourth row.\n'; | ||
metricsCollector.onResponse(this.bigtable.projectId); | ||
logger.value += '15. User reads row 1\n'; | ||
logger.value += '16. Stream ends, operation completes\n'; | ||
metricsCollector.onAttemptComplete( | ||
this.bigtable.projectId, | ||
grpc.status.OK | ||
); | ||
metricsCollector.onOperationComplete( | ||
this.bigtable.projectId, | ||
grpc.status.OK | ||
); | ||
} | ||
} | ||
} | ||
const table = new FakeTable(); | ||
await table.fakeMethod(); | ||
const expectedOutput = fs.readFileSync( | ||
'./test/metrics-collector/typical-method-call.txt', | ||
'utf8' | ||
); | ||
// Ensure events occurred in the right order here: | ||
assert.strictEqual(logger.value, expectedOutput.replace(/\r/g, '')); | ||
assert.deepStrictEqual( | ||
testHandler.requestsHandled, | ||
expectedRequestsHandled | ||
); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
// Copyright 2025 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
export const expectedRequestsHandled = [ | ||
{ | ||
attemptLatency: 2000, | ||
serverLatency: 101, | ||
connectivityErrorCount: 0, | ||
streamingOperation: 'true', | ||
attemptStatus: 4, | ||
clientName: 'nodejs-bigtable', | ||
metricsCollectorData: { | ||
appProfileId: undefined, | ||
instanceId: 'fakeInstanceId', | ||
table: 'fakeTableId', | ||
cluster: 'fake-cluster3', | ||
zone: 'us-west1-c', | ||
methodName: 'Bigtable.ReadRows', | ||
clientUid: 'fake-uuid', | ||
}, | ||
projectId: 'my-project', | ||
}, | ||
{ | ||
attemptLatency: 2000, | ||
serverLatency: 103, | ||
connectivityErrorCount: 0, | ||
streamingOperation: 'true', | ||
attemptStatus: 0, | ||
clientName: 'nodejs-bigtable', | ||
metricsCollectorData: { | ||
appProfileId: undefined, | ||
instanceId: 'fakeInstanceId', | ||
table: 'fakeTableId', | ||
cluster: 'fake-cluster3', | ||
zone: 'us-west1-c', | ||
methodName: 'Bigtable.ReadRows', | ||
clientUid: 'fake-uuid', | ||
}, | ||
projectId: 'my-project', | ||
}, | ||
{ | ||
finalOperationStatus: 0, | ||
streamingOperation: 'true', | ||
metricsCollectorData: { | ||
appProfileId: undefined, | ||
instanceId: 'fakeInstanceId', | ||
table: 'fakeTableId', | ||
cluster: 'fake-cluster3', | ||
zone: 'us-west1-c', | ||
methodName: 'Bigtable.ReadRows', | ||
clientUid: 'fake-uuid', | ||
}, | ||
clientName: 'nodejs-bigtable', | ||
projectId: 'my-project', | ||
operationLatency: 7000, | ||
retryCount: 1, | ||
firstResponseLatency: 5000, | ||
}, | ||
]; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
1. The operation starts | ||
getDate call returns 1000 ms | ||
2. The attempt starts. | ||
getDate call returns 2000 ms | ||
3. Client receives status information. | ||
4. Client receives metadata. | ||
5. Client receives first row. | ||
getDate call returns 3000 ms | ||
6. Client receives metadata. | ||
7. Client receives second row. | ||
8. A transient error occurs. | ||
getDate call returns 4000 ms | ||
Recording parameters for onAttemptComplete: | ||
{"attemptLatency":2000,"serverLatency":101,"connectivityErrorCount":0,"streamingOperation":"true","attemptStatus":4,"clientName":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","methodName":"Bigtable.ReadRows","clientUid":"fake-uuid"},"projectId":"my-project"} | ||
9. After a timeout, the second attempt is made. | ||
getDate call returns 5000 ms | ||
10. Client receives status information. | ||
11. Client receives metadata. | ||
12. Client receives third row. | ||
getDate call returns 6000 ms | ||
13. Client receives metadata. | ||
14. Client receives fourth row. | ||
15. User reads row 1 | ||
16. Stream ends, operation completes | ||
getDate call returns 7000 ms | ||
Recording parameters for onAttemptComplete: | ||
{"attemptLatency":2000,"serverLatency":103,"connectivityErrorCount":0,"streamingOperation":"true","attemptStatus":0,"clientName":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","methodName":"Bigtable.ReadRows","clientUid":"fake-uuid"},"projectId":"my-project"} | ||
getDate call returns 8000 ms | ||
Recording parameters for onOperationComplete: | ||
{"finalOperationStatus":0,"streamingOperation":"true","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","methodName":"Bigtable.ReadRows","clientUid":"fake-uuid"},"clientName":"nodejs-bigtable","projectId":"my-project","operationLatency":7000,"retryCount":1,"firstResponseLatency":5000} |
Unchanged files with check annotations Beta
} | ||
} | ||
function isStringArray(array: any): array is string[] { | ||
return array.every((cluster: any) => { | ||
return typeof cluster === 'string'; | ||
}); | ||
} | ||
function isClusterArray(array: any): array is Cluster[] { | ||
return array.every((cluster: any) => { | ||
return isCluster(cluster); | ||
}); | ||
} | ||
function isCluster(cluster: any): cluster is Cluster { | ||
return ( | ||
(cluster as Cluster).bigtable !== undefined && | ||
(cluster as Cluster).instance !== undefined && |
import {Mutation, ConvertFromBytesUserOptions, Bytes, Data} from './mutation'; | ||
import {Bigtable} from '.'; | ||
import { | ||
Table, | ||
Entry, | ||
MutateCallback, | ||
MutateResponse, |
// Handling retries in this client. Specify the retry options to | ||
// make sure nothing is retried in retry-request. | ||
noResponseRetries: 0, | ||
shouldRetryFn: (_: any) => { | ||
Check warning on line 363 in src/tabular-api-surface.ts
|
||
return false; | ||
}, | ||
}; | ||
userStream.emit('error', error); | ||
} | ||
}) | ||
.on('data', _ => { | ||
// Reset error count after a successful read so the backoff | ||
// time won't keep increasing when as stream had multiple errors | ||
numConsecutiveErrors = 0; | ||
// Handling retries in this client. Specify the retry options to | ||
// make sure nothing is retried in retry-request. | ||
noResponseRetries: 0, | ||
shouldRetryFn: (_: any) => { | ||
return false; | ||
}, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking a lot better!
One more comment though: there's a lot of duplicated fields between OnOperationCompleteData and OnAttemptCompleteData. You should consider deduplicating the data so that each field exists in just one place (some fields belong at the operation level, and some are related to the specific attempt, and it could make sense to keep them separate)
In Python, both the Attempt and its associated Operation are passed in for on_attempt_complete. And the Operation holds a list of associated attempts, which is used to calculate retry_count
But that may be less applicable here since you're building these objects on the fly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had something similar before with
StandardAttributes
when the metrics and attributes were separate so I decided to de-duplicate the data with a new StandardData interface and then extend it.