From 8b917eab223c7f71303ca92ce93722db8e73c2ed Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Fri, 25 Mar 2022 18:10:47 -0400 Subject: [PATCH 01/18] fix: add rpc level retries for mutate --- src/table.ts | 25 ++++++++++++---- test/table.ts | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 6 deletions(-) diff --git a/src/table.ts b/src/table.ts index bcf7606c7..5da25c8ae 100644 --- a/src/table.ts +++ b/src/table.ts @@ -46,6 +46,7 @@ import {Duplex} from 'stream'; // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE) const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]); +const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set([4, 14]); // (1=CANCELLED) const IGNORED_STATUS_CODES = new Set([1]); @@ -1507,13 +1508,26 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const onBatchResponse = ( err: ServiceError | PartialFailureError | null ) => { - // TODO: enable retries when the entire RPC fails if (err) { - // The error happened before a request was even made, don't retry. + // Retry RPC level errors + if (!(err instanceof PartialFailureError)) { + const serviceError = err as ServiceError; + if ( + numRequestsMade <= maxRetries && + IDEMPOTENT_RETRYABLE_STATUS_CODES.has(serviceError.code) + ) { + console.log('RETRYING ' + err.code); + makeNextBatchRequest(); + return; + } + } callback(err); return; - } - if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) { + } else if ( + pendingEntryIndices.size !== 0 && + numRequestsMade <= maxRetries + ) { + console.log('RETRYING partial'); makeNextBatchRequest(); return; } @@ -1552,8 +1566,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); retryOpts, }) .on('error', (err: ServiceError) => { - // TODO: this check doesn't actually do anything, onBatchResponse - // currently doesn't retry RPC errors, only entry failures + // The error happened before a request was even made, don't retry. if (numRequestsMade === 0) { callback(err); // Likely a "projectId not detected" error. return; diff --git a/test/table.ts b/test/table.ts index 50b8064d4..525758e8a 100644 --- a/test/table.ts +++ b/test/table.ts @@ -2609,6 +2609,88 @@ describe('Bigtable/Table', () => { }); }); }); + + describe('rpc level retries', () => { + let emitters: EventEmitter[] | null; // = [((stream: Writable) => { stream.push([{ key: 'a' }]); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let entryRequests: any; + + beforeEach(() => { + emitters = null; // This needs to be assigned in each test case. + + entryRequests = []; + + sandbox.stub(ds, 'decorateStatus').returns({} as DecoratedStatus); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + table.bigtable.request = (config: any) => { + entryRequests.push(config.reqOpts.entries); + const stream = new PassThrough({ + objectMode: true, + }); + + setImmediate(() => { + (emitters!.shift() as any)(stream); + }); + + return stream; + }; + }); + + it('should not retry unretriable errors', done => { + const unretriableError = new Error('not retryable') as ServiceError; + unretriableError.code = 10; // Aborted + emitters = [ + ((stream: Writable) => { + stream.emit('error', unretriableError); + }) as {} as EventEmitter, + ]; + table.maxRetries = 1; + table.mutate(entries, () => { + assert.strictEqual(entryRequests.length, 1); + done(); + }); + }); + + it('should retry retryable errors', done => { + const error = new Error('retryable') as ServiceError; + error.code = 14; // Unavailable + emitters = [ + ((stream: Writable) => { + stream.emit('error', error); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.end(); + }) as {} as EventEmitter, + ]; + table.maxRetries = 1; + table.mutate(entries, () => { + assert.strictEqual(entryRequests.length, 2); + done(); + }); + }); + + it('should not retry more than maxRetries times', done => { + const error = new Error('retryable') as ServiceError; + error.code = 14; // Unavailable + emitters = [ + ((stream: Writable) => { + stream.emit('error', error); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.emit('error', error); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.end(); + }) as {} as EventEmitter, + ]; + table.maxRetries = 1; + table.mutate(entries, () => { + assert.strictEqual(entryRequests.length, 2); + done(); + }); + }); + }); }); describe('row', () => { From 96d18848f53fbb6c1016ea5dde6d9d51c8bac30d Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Mon, 28 Mar 2022 12:02:21 -0400 Subject: [PATCH 02/18] remove debugging logs --- src/table.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/table.ts b/src/table.ts index 5da25c8ae..e182865ad 100644 --- a/src/table.ts +++ b/src/table.ts @@ -1516,7 +1516,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); numRequestsMade <= maxRetries && IDEMPOTENT_RETRYABLE_STATUS_CODES.has(serviceError.code) ) { - console.log('RETRYING ' + err.code); makeNextBatchRequest(); return; } @@ -1527,7 +1526,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries ) { - console.log('RETRYING partial'); makeNextBatchRequest(); return; } From 370b64ae07808396fa316b05d0bdb8bab443bb43 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Tue, 29 Mar 2022 16:43:27 -0400 Subject: [PATCH 03/18] add exponential backoff --- src/table.ts | 56 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/src/table.ts b/src/table.ts index e182865ad..dce61c391 100644 --- a/src/table.ts +++ b/src/table.ts @@ -15,6 +15,7 @@ import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import {ServiceError} from 'google-gax'; +import {BackoffSettings} from 'google-gax/build/src/gax'; import {decorateStatus} from './decorateStatus'; import {PassThrough, Transform} from 'stream'; @@ -50,6 +51,12 @@ const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set([4, 14]); // (1=CANCELLED) const IGNORED_STATUS_CODES = new Set([1]); +const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = { + initialRetryDelayMillis: 10, + retryDelayMultiplier: 2, + maxRetryDelayMillis: 60000, +}; + /** * @typedef {object} Policy * @property {number} [version] Specifies the format of the policy. @@ -737,6 +744,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const hasLimit = rowsLimit !== 0; let rowsRead = 0; let numRequestsMade = 0; + let retryTimer: NodeJS.Timeout | null; rowKeys = options.keys || []; @@ -789,6 +797,9 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); if (activeRequestStream) { activeRequestStream.abort(); } + if (retryTimer) { + clearTimeout(retryTimer); + } return end(); }; @@ -807,6 +818,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const retryOpts = { currentRetryAttempt: numRequestsMade, + // Handling retries in this client. Specify the retry options to + // make sure nothing is retried in retry-request. + noResponseRetries: 0, + shouldRetryFn: function (response: any) { + return false; + }, }; if (lastRowKey) { @@ -941,13 +958,21 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); numRequestsMade <= maxRetries && RETRYABLE_STATUS_CODES.has(error.code) ) { - makeNewRequest(); + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || + DEFAULT_BACKOFF_SETTINGS; + const nextRetryDelay = getNextDelay( + numRequestsMade, + backOffSettings + ); + retryTimer = setTimeout(makeNewRequest, nextRetryDelay); } else { userStream.emit('error', error); } }) .on('end', () => { activeRequestStream = null; + retryTimer = null; }); rowStream.pipe(userStream); numRequestsMade++; @@ -1516,7 +1541,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); numRequestsMade <= maxRetries && IDEMPOTENT_RETRYABLE_STATUS_CODES.has(serviceError.code) ) { - makeNextBatchRequest(); + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || + DEFAULT_BACKOFF_SETTINGS; + const nextDelay = getNextDelay(numRequestsMade, backOffSettings); + setTimeout(makeNextBatchRequest, nextDelay); return; } } @@ -1526,7 +1555,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries ) { - makeNextBatchRequest(); + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || + DEFAULT_BACKOFF_SETTINGS; + const nextDelay = getNextDelay(numRequestsMade, backOffSettings); + setTimeout(makeNextBatchRequest, nextDelay); return; } @@ -1553,6 +1586,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const retryOpts = { currentRetryAttempt: numRequestsMade, + // Handling retries in this client. Specify the retry options to + // make sure nothing is retried in retry-request. + noResponseRetries: 0, + shouldRetryFn: function (response: any) { + return false; + }, }; this.bigtable @@ -2008,6 +2047,17 @@ promisifyAll(Table, { exclude: ['family', 'row'], }); +function getNextDelay(requestCount: number, config: BackoffSettings) { + // 0 - 100 ms jitter + const jitter = Math.floor(Math.random() * 100); + const calculatedNextRetryDelay = + config.initialRetryDelayMillis * + Math.pow(config.retryDelayMultiplier, Math.max(0, requestCount - 1)) + + jitter; + + return Math.min(calculatedNextRetryDelay, config.maxRetryDelayMillis); +} + export interface GoogleInnerError { reason?: string; message?: string; From 03cde1658f0becfae52ce64e267dce26daa81b65 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Tue, 29 Mar 2022 18:02:57 -0400 Subject: [PATCH 04/18] simplify mutate row retry logic --- src/table.ts | 41 +++++++++++++++-------------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/src/table.ts b/src/table.ts index dce61c391..278fb5bac 100644 --- a/src/table.ts +++ b/src/table.ts @@ -807,6 +807,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); let rowStream: Duplex; const makeNewRequest = () => { + if (retryTimer) { + // Avoid cancelling an expired timer if the + // stream is cancelled in the middle of a retry + retryTimer = null; + } const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : ''; // eslint-disable-next-line @typescript-eslint/no-explicit-any chunkTransformer = new ChunkTransformer({decode: options.decode} as any); @@ -1530,31 +1535,15 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); ); const mutationErrorsByEntryIndex = new Map(); - const onBatchResponse = ( - err: ServiceError | PartialFailureError | null - ) => { - if (err) { - // Retry RPC level errors - if (!(err instanceof PartialFailureError)) { - const serviceError = err as ServiceError; - if ( - numRequestsMade <= maxRetries && - IDEMPOTENT_RETRYABLE_STATUS_CODES.has(serviceError.code) - ) { - const backOffSettings = - options.gaxOptions?.retry?.backoffSettings || - DEFAULT_BACKOFF_SETTINGS; - const nextDelay = getNextDelay(numRequestsMade, backOffSettings); - setTimeout(makeNextBatchRequest, nextDelay); - return; - } - } - callback(err); - return; - } else if ( - pendingEntryIndices.size !== 0 && - numRequestsMade <= maxRetries - ) { + const isRetryable = (err: ServiceError | null) => { + if (pendingEntryIndices.size === 0 || numRequestsMade > maxRetries) { + return false; + } + return err === null || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code); + }; + + const onBatchResponse = (err: ServiceError | null) => { + if (isRetryable(err as ServiceError)) { const backOffSettings = options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; @@ -1565,7 +1554,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); if (mutationErrorsByEntryIndex.size !== 0) { const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); - err = new PartialFailureError(mutationErrors); + callback(new PartialFailureError(mutationErrors)); } callback(err); From 8cad4c4ab8ec1c76abc2979405d9ceac69257978 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Tue, 29 Mar 2022 18:22:56 -0400 Subject: [PATCH 05/18] fix broken tests --- src/table.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/table.ts b/src/table.ts index 278fb5bac..266d5de36 100644 --- a/src/table.ts +++ b/src/table.ts @@ -1539,7 +1539,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); if (pendingEntryIndices.size === 0 || numRequestsMade > maxRetries) { return false; } - return err === null || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code); + return !err || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code); }; const onBatchResponse = (err: ServiceError | null) => { @@ -1555,6 +1555,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); if (mutationErrorsByEntryIndex.size !== 0) { const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); callback(new PartialFailureError(mutationErrors)); + return; } callback(err); From da01aea08de23499ba43d811f61d31c2c639e198 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Tue, 29 Mar 2022 20:02:40 -0400 Subject: [PATCH 06/18] ignore checks for retry options --- system-test/mutate-rows.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/system-test/mutate-rows.ts b/system-test/mutate-rows.ts index f7e15ec8e..98b3c1b2c 100644 --- a/system-test/mutate-rows.ts +++ b/system-test/mutate-rows.ts @@ -104,10 +104,16 @@ describe('Bigtable/Table', () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any reqOpts!.entries!.map(entry => (entry.rowKey as any).asciiSlice()) ); - assert.deepStrictEqual( - options!.retryRequestOptions, - retryRequestOptions - ); + // TODO: Currently retry options for retry-request are ignored. + // Retry-request is not handling grpc errors correctly, so + // we are handling retries in table.ts and disabling retries in + // gax to avoid a request getting retried in multiple places. + // Re-enable this test after switching back to using the retry + // logic in gax. + // assert.deepStrictEqual( + // options!.retryRequestOptions, + // retryRequestOptions + // ); mutationCallTimes.push(new Date().getTime()); const emitter = new PassThrough({objectMode: true}); dispatch(emitter, responses!.shift()); From 9f2e75a562657772a560687e8e0090a932c941ac Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Wed, 30 Mar 2022 09:11:24 -0400 Subject: [PATCH 07/18] fix lint --- system-test/mutate-rows.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/system-test/mutate-rows.ts b/system-test/mutate-rows.ts index 98b3c1b2c..62936f0dc 100644 --- a/system-test/mutate-rows.ts +++ b/system-test/mutate-rows.ts @@ -108,8 +108,8 @@ describe('Bigtable/Table', () => { // Retry-request is not handling grpc errors correctly, so // we are handling retries in table.ts and disabling retries in // gax to avoid a request getting retried in multiple places. - // Re-enable this test after switching back to using the retry - // logic in gax. + // Re-enable this test after switching back to using the retry + // logic in gax // assert.deepStrictEqual( // options!.retryRequestOptions, // retryRequestOptions From b8dc1a7b11cd321e09cf65c0d18cfca6a9587489 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Wed, 30 Mar 2022 11:33:56 -0400 Subject: [PATCH 08/18] comments --- src/table.ts | 41 ++++++++++++++++++++++++++--------------- test/table.ts | 10 +++++----- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/table.ts b/src/table.ts index 266d5de36..149b63a26 100644 --- a/src/table.ts +++ b/src/table.ts @@ -826,7 +826,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); // Handling retries in this client. Specify the retry options to // make sure nothing is retried in retry-request. noResponseRetries: 0, - shouldRetryFn: function (response: any) { + shouldRetryFn: (_: any) => { return false; }, }; @@ -960,7 +960,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return; } if ( - numRequestsMade <= maxRetries && + numRequestsMade < maxRetries && RETRYABLE_STATUS_CODES.has(error.code) ) { const backOffSettings = @@ -1536,14 +1536,20 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const mutationErrorsByEntryIndex = new Map(); const isRetryable = (err: ServiceError | null) => { - if (pendingEntryIndices.size === 0 || numRequestsMade > maxRetries) { + // Don't retry if there are no more entries or retry attempts, + // or the error happened before a request was made. + if ( + pendingEntryIndices.size === 0 || + numRequestsMade >= maxRetries || + numRequestsMade === 0 + ) { return false; } return !err || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code); }; const onBatchResponse = (err: ServiceError | null) => { - if (isRetryable(err as ServiceError)) { + if (isRetryable(err)) { const backOffSettings = options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; @@ -1552,10 +1558,21 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return; } - if (mutationErrorsByEntryIndex.size !== 0) { - const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); - callback(new PartialFailureError(mutationErrors)); - return; + if (numRequestsMade !== 0) { + // If there's a race condition where all the mutations + // succeeded, but the server returned an error (like deadline + // exceeded), set error to null + if (pendingEntryIndices.size === 0 && err) { + err = null; + } + + if (mutationErrorsByEntryIndex.size !== 0) { + const mutationErrors = Array.from( + mutationErrorsByEntryIndex.values() + ); + callback(new PartialFailureError(mutationErrors)); + return; + } } callback(err); @@ -1579,7 +1596,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); // Handling retries in this client. Specify the retry options to // make sure nothing is retried in retry-request. noResponseRetries: 0, - shouldRetryFn: function (response: any) { + shouldRetryFn: (_: any) => { return false; }, }; @@ -1593,12 +1610,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); retryOpts, }) .on('error', (err: ServiceError) => { - // The error happened before a request was even made, don't retry. - if (numRequestsMade === 0) { - callback(err); // Likely a "projectId not detected" error. - return; - } - onBatchResponse(err); }) .on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => { diff --git a/test/table.ts b/test/table.ts index 525758e8a..0ce1f3197 100644 --- a/test/table.ts +++ b/test/table.ts @@ -2595,12 +2595,12 @@ describe('Bigtable/Table', () => { }); it('should succeed after a retry', done => { - table.maxRetries = 1; + table.maxRetries = 2; table.mutate(entries, done); }); it('should retry the same failed entry', done => { - table.maxRetries = 1; + table.maxRetries = 2; table.mutate(entries, () => { assert.strictEqual(entryRequests[0].length, 2); assert.strictEqual(entryRequests[1].length, 1); @@ -2645,7 +2645,7 @@ describe('Bigtable/Table', () => { stream.emit('error', unretriableError); }) as {} as EventEmitter, ]; - table.maxRetries = 1; + table.maxRetries = 2; table.mutate(entries, () => { assert.strictEqual(entryRequests.length, 1); done(); @@ -2663,7 +2663,7 @@ describe('Bigtable/Table', () => { stream.end(); }) as {} as EventEmitter, ]; - table.maxRetries = 1; + table.maxRetries = 2; table.mutate(entries, () => { assert.strictEqual(entryRequests.length, 2); done(); @@ -2684,7 +2684,7 @@ describe('Bigtable/Table', () => { stream.end(); }) as {} as EventEmitter, ]; - table.maxRetries = 1; + table.maxRetries = 2; table.mutate(entries, () => { assert.strictEqual(entryRequests.length, 2); done(); From 9cee77dc1b49a0292a2063a0fc82ea28e0149458 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Wed, 30 Mar 2022 12:30:02 -0400 Subject: [PATCH 09/18] reset retry after a succee response --- src/table.ts | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/table.ts b/src/table.ts index 149b63a26..650359d44 100644 --- a/src/table.ts +++ b/src/table.ts @@ -743,7 +743,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const rowsLimit = options.limit || 0; const hasLimit = rowsLimit !== 0; let rowsRead = 0; - let numRequestsMade = 0; + let numConsecutiveAttempt = 0; let retryTimer: NodeJS.Timeout | null; rowKeys = options.keys || []; @@ -822,7 +822,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); } as google.bigtable.v2.IReadRowsRequest; const retryOpts = { - currentRetryAttempt: numRequestsMade, + currentRetryAttempt: numConsecutiveAttempt, // Handling retries in this client. Specify the retry options to // make sure nothing is retried in retry-request. noResponseRetries: 0, @@ -938,7 +938,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); ) { return next(); } - numRequestsMade = 0; + numConsecutiveAttempt = 0; rowsRead++; const row = this.row(rowData.key); row.data = rowData.data; @@ -960,27 +960,28 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return; } if ( - numRequestsMade < maxRetries && + numConsecutiveAttempt < maxRetries && RETRYABLE_STATUS_CODES.has(error.code) ) { const backOffSettings = options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; const nextRetryDelay = getNextDelay( - numRequestsMade, + numConsecutiveAttempt, backOffSettings ); retryTimer = setTimeout(makeNewRequest, nextRetryDelay); + numConsecutiveAttempt++; } else { userStream.emit('error', error); } }) + .on('data', (_) => {numConsecutiveAttempt = 0}) .on('end', () => { activeRequestStream = null; retryTimer = null; }); rowStream.pipe(userStream); - numRequestsMade++; }; makeNewRequest(); @@ -2048,12 +2049,14 @@ promisifyAll(Table, { exclude: ['family', 'row'], }); -function getNextDelay(requestCount: number, config: BackoffSettings) { +function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) { // 0 - 100 ms jitter const jitter = Math.floor(Math.random() * 100); + console.log("RETRY DELAY: " + config.initialRetryDelayMillis * + Math.pow(config.retryDelayMultiplier, numConsecutiveErrors)); const calculatedNextRetryDelay = config.initialRetryDelayMillis * - Math.pow(config.retryDelayMultiplier, Math.max(0, requestCount - 1)) + + Math.pow(config.retryDelayMultiplier, numConsecutiveErrors) + jitter; return Math.min(calculatedNextRetryDelay, config.maxRetryDelayMillis); From 96b7764d343304dcad99610529dcd6f281472ba0 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Wed, 30 Mar 2022 12:36:50 -0400 Subject: [PATCH 10/18] fix lint --- src/table.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/table.ts b/src/table.ts index 650359d44..ef5553080 100644 --- a/src/table.ts +++ b/src/table.ts @@ -976,7 +976,9 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); userStream.emit('error', error); } }) - .on('data', (_) => {numConsecutiveAttempt = 0}) + .on('data', _ => { + numConsecutiveAttempt = 0; + }) .on('end', () => { activeRequestStream = null; retryTimer = null; @@ -2052,8 +2054,6 @@ promisifyAll(Table, { function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) { // 0 - 100 ms jitter const jitter = Math.floor(Math.random() * 100); - console.log("RETRY DELAY: " + config.initialRetryDelayMillis * - Math.pow(config.retryDelayMultiplier, numConsecutiveErrors)); const calculatedNextRetryDelay = config.initialRetryDelayMillis * Math.pow(config.retryDelayMultiplier, numConsecutiveErrors) + From 5a747a9f3e68e28da6faa64c950075c5f5102b07 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Wed, 30 Mar 2022 13:56:43 -0400 Subject: [PATCH 11/18] fix system test --- system-test/data/mutate-rows-retry-test.json | 8 ++++---- system-test/data/read-rows-retry-test.json | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/system-test/data/mutate-rows-retry-test.json b/system-test/data/mutate-rows-retry-test.json index 7ecc1b01d..7e062ed80 100644 --- a/system-test/data/mutate-rows-retry-test.json +++ b/system-test/data/mutate-rows-retry-test.json @@ -16,7 +16,7 @@ ] }, { "name": "retries the failed mutations", - "max_retries": 3, + "max_retries": 4, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, @@ -36,7 +36,7 @@ ] }, { "name": "has a `PartialFailureError` error when an entry fails after the retries", - "max_retries": 3, + "max_retries": 4, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, @@ -59,7 +59,7 @@ ] }, { "name": "does not retry unretryable mutations", - "max_retries": 5, + "max_retries": 6, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, @@ -91,7 +91,7 @@ ] }, { "name": "considers network errors towards the retry count", - "max_retries": 3, + "max_retries": 4, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, diff --git a/system-test/data/read-rows-retry-test.json b/system-test/data/read-rows-retry-test.json index 037c0e1f6..aad5178c6 100644 --- a/system-test/data/read-rows-retry-test.json +++ b/system-test/data/read-rows-retry-test.json @@ -76,7 +76,7 @@ { "name": "resets the retry counter after a successful read", - "max_retries": 3, + "max_retries": 4, "request_options": [ { "rowKeys": [], "rowRanges": [{}] @@ -211,7 +211,7 @@ { "name": "does the previous 5 things in one giant test case", - "max_retries": 3, + "max_retries": 4, "createReadStream_options": { "limit": 10, "ranges": [{ From 2c849f1b8175574f21adca3edaf4d5be02c098c6 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Wed, 30 Mar 2022 16:01:54 -0400 Subject: [PATCH 12/18] clean up --- src/table.ts | 66 +++++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/src/table.ts b/src/table.ts index ef5553080..4729c3f3b 100644 --- a/src/table.ts +++ b/src/table.ts @@ -16,7 +16,6 @@ import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import {ServiceError} from 'google-gax'; import {BackoffSettings} from 'google-gax/build/src/gax'; -import {decorateStatus} from './decorateStatus'; import {PassThrough, Transform} from 'stream'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -743,7 +742,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const rowsLimit = options.limit || 0; const hasLimit = rowsLimit !== 0; let rowsRead = 0; - let numConsecutiveAttempt = 0; + let numConsecutiveErrors = 0; let retryTimer: NodeJS.Timeout | null; rowKeys = options.keys || []; @@ -822,7 +821,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); } as google.bigtable.v2.IReadRowsRequest; const retryOpts = { - currentRetryAttempt: numConsecutiveAttempt, + currentRetryAttempt: numConsecutiveErrors, // Handling retries in this client. Specify the retry options to // make sure nothing is retried in retry-request. noResponseRetries: 0, @@ -938,7 +937,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); ) { return next(); } - numConsecutiveAttempt = 0; rowsRead++; const row = this.row(rowData.key); row.data = rowData.data; @@ -960,24 +958,24 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return; } if ( - numConsecutiveAttempt < maxRetries && + numConsecutiveErrors < maxRetries && RETRYABLE_STATUS_CODES.has(error.code) ) { const backOffSettings = options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; const nextRetryDelay = getNextDelay( - numConsecutiveAttempt, + numConsecutiveErrors, backOffSettings ); retryTimer = setTimeout(makeNewRequest, nextRetryDelay); - numConsecutiveAttempt++; + numConsecutiveErrors++; } else { userStream.emit('error', error); } }) .on('data', _ => { - numConsecutiveAttempt = 0; + numConsecutiveErrors = 0; }) .on('end', () => { activeRequestStream = null; @@ -1539,19 +1537,26 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const mutationErrorsByEntryIndex = new Map(); const isRetryable = (err: ServiceError | null) => { - // Don't retry if there are no more entries or retry attempts, - // or the error happened before a request was made. + // Don't retry if there are no more entries or retry attempts if ( pendingEntryIndices.size === 0 || - numRequestsMade >= maxRetries || - numRequestsMade === 0 + numRequestsMade > maxRetries ) { return false; } + // If the error is empty but there are still outstanding mutations, + // it means that there are retryable errors in the mutate response + // even when the RPC succeeded return !err || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code); }; const onBatchResponse = (err: ServiceError | null) => { + // Return if the error happened before a request was made + if (numRequestsMade === 0) { + callback(err); + return; + } + if (isRetryable(err)) { const backOffSettings = options.gaxOptions?.retry?.backoffSettings || @@ -1561,21 +1566,18 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return; } - if (numRequestsMade !== 0) { - // If there's a race condition where all the mutations - // succeeded, but the server returned an error (like deadline - // exceeded), set error to null - if (pendingEntryIndices.size === 0 && err) { - err = null; - } + // If there's no more pending mutations, set the error + // to null + if (pendingEntryIndices.size === 0) { + err = null; + } - if (mutationErrorsByEntryIndex.size !== 0) { - const mutationErrors = Array.from( - mutationErrorsByEntryIndex.values() - ); - callback(new PartialFailureError(mutationErrors)); - return; - } + if (mutationErrorsByEntryIndex.size !== 0) { + const mutationErrors = Array.from( + mutationErrorsByEntryIndex.values() + ); + callback(new PartialFailureError(mutationErrors, err)); + return; } callback(err); @@ -1626,13 +1628,10 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); mutationErrorsByEntryIndex.delete(originalEntriesIndex); return; } - if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { + if (!IDEMPOTENT_RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { pendingEntryIndices.delete(originalEntriesIndex); } - const status = decorateStatus(entry.status); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (status as any).entry = originalEntry; - mutationErrorsByEntryIndex.set(originalEntriesIndex, status); + mutationErrorsByEntryIndex.set(originalEntriesIndex, entry.status); }); }) .on('end', onBatchResponse); @@ -2069,7 +2068,7 @@ export interface GoogleInnerError { export class PartialFailureError extends Error { errors?: GoogleInnerError[]; - constructor(errors: GoogleInnerError[]) { + constructor(errors: GoogleInnerError[], rpcError?: ServiceError | null) { super(); this.errors = errors; this.name = 'PartialFailureError'; @@ -2082,5 +2081,8 @@ export class PartialFailureError extends Error { messages.push('\n'); } this.message = messages.join('\n'); + if (rpcError) { + this.message += "Request failed with: " + rpcError.message; + } } } From 9f8317ee909125ee7bf7cea0ba83cb2008c8219c Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Wed, 30 Mar 2022 20:23:02 -0400 Subject: [PATCH 13/18] add rpc status in mutate rows, and remove http status --- src/decorateStatus.ts | 31 ++------------------ src/table.ts | 17 +++++------ system-test/data/mutate-rows-retry-test.json | 10 +++---- test/common.ts | 12 -------- test/table.ts | 10 +++---- 5 files changed, 21 insertions(+), 59 deletions(-) diff --git a/src/decorateStatus.ts b/src/decorateStatus.ts index 079778a31..77498cb26 100644 --- a/src/decorateStatus.ts +++ b/src/decorateStatus.ts @@ -15,30 +15,6 @@ import * as extend from 'extend'; import {google} from '../protos/protos'; -/** - * @const {object} - A map of protobuf codes to HTTP status codes. - * @private - */ -const GRPC_ERROR_CODE_TO_HTTP = [ - {code: 200, message: 'OK'}, - {code: 499, message: 'Client Closed Request'}, - {code: 500, message: 'Internal Server Error'}, - {code: 400, message: 'Bad Request'}, - {code: 504, message: 'Gateway Timeout'}, - {code: 404, message: 'Not Found'}, - {code: 409, message: 'Conflict'}, - {code: 403, message: 'Forbidden'}, - {code: 429, message: 'Too Many Requests'}, - {code: 412, message: 'Precondition Failed'}, - {code: 409, message: 'Conflict'}, - {code: 400, message: 'Bad Request'}, - {code: 501, message: 'Not Implemented'}, - {code: 500, message: 'Internal Server Error'}, - {code: 503, message: 'Service Unavailable'}, - {code: 500, message: 'Internal Server Error'}, - {code: 401, message: 'Unauthorized'}, -]; - export type DecoratedStatus = google.rpc.IStatus & { code: number; message: string; @@ -56,9 +32,8 @@ export function decorateStatus( response?: google.rpc.IStatus | null ): DecoratedStatus | null { const obj = {}; - if (response && GRPC_ERROR_CODE_TO_HTTP[response.code!]) { - const defaultResponseDetails = GRPC_ERROR_CODE_TO_HTTP[response.code!]; - let message = defaultResponseDetails.message; + if (response && response.code) { + let message = ''; if (response.message) { // gRPC error messages can be either stringified JSON or strings. try { @@ -68,7 +43,7 @@ export function decorateStatus( } } return extend(true, obj, response, { - code: defaultResponseDetails.code, + code: response.code, message, }); } diff --git a/src/table.ts b/src/table.ts index 4729c3f3b..2347627a3 100644 --- a/src/table.ts +++ b/src/table.ts @@ -16,6 +16,7 @@ import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import {ServiceError} from 'google-gax'; import {BackoffSettings} from 'google-gax/build/src/gax'; +import {decorateStatus} from './decorateStatus'; import {PassThrough, Transform} from 'stream'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -1538,10 +1539,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const isRetryable = (err: ServiceError | null) => { // Don't retry if there are no more entries or retry attempts - if ( - pendingEntryIndices.size === 0 || - numRequestsMade > maxRetries - ) { + if (pendingEntryIndices.size === 0 || numRequestsMade > maxRetries) { return false; } // If the error is empty but there are still outstanding mutations, @@ -1573,9 +1571,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); } if (mutationErrorsByEntryIndex.size !== 0) { - const mutationErrors = Array.from( - mutationErrorsByEntryIndex.values() - ); + const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); callback(new PartialFailureError(mutationErrors, err)); return; } @@ -1631,7 +1627,10 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); if (!IDEMPOTENT_RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { pendingEntryIndices.delete(originalEntriesIndex); } - mutationErrorsByEntryIndex.set(originalEntriesIndex, entry.status); + const status = decorateStatus(entry.status); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (status as any).entry = originalEntry; + mutationErrorsByEntryIndex.set(originalEntriesIndex, status); }); }) .on('end', onBatchResponse); @@ -2082,7 +2081,7 @@ export class PartialFailureError extends Error { } this.message = messages.join('\n'); if (rpcError) { - this.message += "Request failed with: " + rpcError.message; + this.message += 'Request failed with: ' + rpcError.message; } } } diff --git a/system-test/data/mutate-rows-retry-test.json b/system-test/data/mutate-rows-retry-test.json index 7e062ed80..e88fbc4bb 100644 --- a/system-test/data/mutate-rows-retry-test.json +++ b/system-test/data/mutate-rows-retry-test.json @@ -16,7 +16,7 @@ ] }, { "name": "retries the failed mutations", - "max_retries": 4, + "max_retries": 3, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, @@ -36,7 +36,7 @@ ] }, { "name": "has a `PartialFailureError` error when an entry fails after the retries", - "max_retries": 4, + "max_retries": 3, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, @@ -59,7 +59,7 @@ ] }, { "name": "does not retry unretryable mutations", - "max_retries": 6, + "max_retries": 5, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, @@ -70,7 +70,7 @@ ], "responses": [ { "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] }, - { "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] }, + { "code": 200, "entry_codes": [ 4, 14, 14, 14, 0 ] }, { "code": 200, "entry_codes": [ 1, 4, 4, 0 ] }, { "code": 200, "entry_codes": [ 0, 4 ] }, { "code": 200, "entry_codes": [ 4 ] }, @@ -91,7 +91,7 @@ ] }, { "name": "considers network errors towards the retry count", - "max_retries": 4, + "max_retries": 3, "mutations_request": [ { "method": "insert", "key": "foo", "data": {} }, { "method": "insert", "key": "bar", "data": {} }, diff --git a/test/common.ts b/test/common.ts index de7b50d97..b979e0225 100644 --- a/test/common.ts +++ b/test/common.ts @@ -17,18 +17,6 @@ import {describe, it} from 'mocha'; import {shouldRetryRequest, decorateStatus} from '../src/decorateStatus'; describe('decorateStatus', () => { - it('should attach the correct HTTP code', () => { - const grpcStatus = {code: 0}; - const status = decorateStatus(grpcStatus); - assert.strictEqual(status!.message, 'OK'); - }); - - it('should return null if the code doesnt match', () => { - const grpcStatus = {code: 999}; - const status = decorateStatus(grpcStatus); - assert.strictEqual(status, null); - }); - it('should accept a basic message', () => { const message = 'QUACK!'; const grpcStatus = {code: 1, message}; diff --git a/test/table.ts b/test/table.ts index 0ce1f3197..525758e8a 100644 --- a/test/table.ts +++ b/test/table.ts @@ -2595,12 +2595,12 @@ describe('Bigtable/Table', () => { }); it('should succeed after a retry', done => { - table.maxRetries = 2; + table.maxRetries = 1; table.mutate(entries, done); }); it('should retry the same failed entry', done => { - table.maxRetries = 2; + table.maxRetries = 1; table.mutate(entries, () => { assert.strictEqual(entryRequests[0].length, 2); assert.strictEqual(entryRequests[1].length, 1); @@ -2645,7 +2645,7 @@ describe('Bigtable/Table', () => { stream.emit('error', unretriableError); }) as {} as EventEmitter, ]; - table.maxRetries = 2; + table.maxRetries = 1; table.mutate(entries, () => { assert.strictEqual(entryRequests.length, 1); done(); @@ -2663,7 +2663,7 @@ describe('Bigtable/Table', () => { stream.end(); }) as {} as EventEmitter, ]; - table.maxRetries = 2; + table.maxRetries = 1; table.mutate(entries, () => { assert.strictEqual(entryRequests.length, 2); done(); @@ -2684,7 +2684,7 @@ describe('Bigtable/Table', () => { stream.end(); }) as {} as EventEmitter, ]; - table.maxRetries = 2; + table.maxRetries = 1; table.mutate(entries, () => { assert.strictEqual(entryRequests.length, 2); done(); From aff55ca1ae22b68b6ef80e8b24b00d98b78f6a1e Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Thu, 31 Mar 2022 09:54:54 -0400 Subject: [PATCH 14/18] remove unnecessary check --- src/table.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/table.ts b/src/table.ts index 2347627a3..b1fae514d 100644 --- a/src/table.ts +++ b/src/table.ts @@ -807,11 +807,10 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); let rowStream: Duplex; const makeNewRequest = () => { - if (retryTimer) { - // Avoid cancelling an expired timer if the - // stream is cancelled in the middle of a retry - retryTimer = null; - } + // Avoid cancelling an expired timer if user + // cancelled the stream in the middle of a retry + retryTimer = null; + const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : ''; // eslint-disable-next-line @typescript-eslint/no-explicit-any chunkTransformer = new ChunkTransformer({decode: options.decode} as any); From 7d144b68dd1b19cf5ee55c99a046a778b2f2270b Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Thu, 31 Mar 2022 10:57:15 -0400 Subject: [PATCH 15/18] remove decorate status --- src/decorateStatus.ts | 55 ---------------------- src/index.ts | 7 ++- src/table.ts | 10 ++-- system-test/mutate-rows.ts | 21 ++++----- test/common.ts | 46 ------------------ test/index.ts | 95 ++++++++++++++++++++------------------ test/table.ts | 49 +++++++++----------- 7 files changed, 90 insertions(+), 193 deletions(-) delete mode 100644 src/decorateStatus.ts delete mode 100644 test/common.ts diff --git a/src/decorateStatus.ts b/src/decorateStatus.ts deleted file mode 100644 index 77498cb26..000000000 --- a/src/decorateStatus.ts +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2019 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import * as extend from 'extend'; -import {google} from '../protos/protos'; - -export type DecoratedStatus = google.rpc.IStatus & { - code: number; - message: string; -}; - -/** - * Checks for a grpc status code and extends the supplied object with - * additional information. - * - * @param {object} obj - The object to be extended. - * @param {object} response - The grpc response. - * @return {object|null} - */ -export function decorateStatus( - response?: google.rpc.IStatus | null -): DecoratedStatus | null { - const obj = {}; - if (response && response.code) { - let message = ''; - if (response.message) { - // gRPC error messages can be either stringified JSON or strings. - try { - message = JSON.parse(response.message).description; - } catch (e) { - message = response.message; - } - } - return extend(true, obj, response, { - code: response.code, - message, - }); - } - return null; -} - -export function shouldRetryRequest(r: {code: number}) { - return [429, 500, 502, 503].includes(r.code); -} diff --git a/src/index.ts b/src/index.ts index 997824b82..a558b6fae 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,7 +29,6 @@ import { CreateInstanceResponse, IInstance, } from './instance'; -import {shouldRetryRequest} from './decorateStatus'; import {google} from '../protos/protos'; import {ServiceError} from 'google-gax'; import * as v2 from './v2'; @@ -842,7 +841,11 @@ export class Bigtable { currentRetryAttempt: 0, noResponseRetries: 0, objectMode: true, - shouldRetryFn: shouldRetryRequest, + // TODO: re-enable shouldRetryFn when the retry + // logic is fixed in gax / retry-request + shouldRetryFn: (_: any) => { + return false; + }, }, config.retryOpts ); diff --git a/src/table.ts b/src/table.ts index b1fae514d..16a6d79ff 100644 --- a/src/table.ts +++ b/src/table.ts @@ -16,7 +16,6 @@ import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import {ServiceError} from 'google-gax'; import {BackoffSettings} from 'google-gax/build/src/gax'; -import {decorateStatus} from './decorateStatus'; import {PassThrough, Transform} from 'stream'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -961,6 +960,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); numConsecutiveErrors < maxRetries && RETRYABLE_STATUS_CODES.has(error.code) ) { + numConsecutiveErrors++; const backOffSettings = options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; @@ -969,7 +969,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); backOffSettings ); retryTimer = setTimeout(makeNewRequest, nextRetryDelay); - numConsecutiveErrors++; } else { userStream.emit('error', error); } @@ -979,7 +978,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); }) .on('end', () => { activeRequestStream = null; - retryTimer = null; }); rowStream.pipe(userStream); }; @@ -1626,10 +1624,10 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); if (!IDEMPOTENT_RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { pendingEntryIndices.delete(originalEntriesIndex); } - const status = decorateStatus(entry.status); + const errorDetails = entry.status; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (status as any).entry = originalEntry; - mutationErrorsByEntryIndex.set(originalEntriesIndex, status); + (errorDetails as any).entry = originalEntry; + mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails); }); }) .on('end', onBatchResponse); diff --git a/system-test/mutate-rows.ts b/system-test/mutate-rows.ts index 62936f0dc..a5c9729b4 100644 --- a/system-test/mutate-rows.ts +++ b/system-test/mutate-rows.ts @@ -27,7 +27,6 @@ import {Entry, PartialFailureError} from '../src/table'; import {CancellableStream, GrpcClient, GoogleAuth} from 'google-gax'; import {BigtableClient} from '../src/v2'; import {PassThrough} from 'stream'; -import {shouldRetryRequest} from '../src/decorateStatus'; const {grpc} = new GrpcClient(); @@ -94,22 +93,22 @@ describe('Bigtable/Table', () => { responses = null; bigtable.api.BigtableClient = { mutateRows: (reqOpts, options) => { - const retryRequestOptions = { - noResponseRetries: 0, - objectMode: true, - shouldRetryFn: shouldRetryRequest, - currentRetryAttempt: currentRetryAttempt++, - }; - mutationBatchesInvoked.push( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - reqOpts!.entries!.map(entry => (entry.rowKey as any).asciiSlice()) - ); // TODO: Currently retry options for retry-request are ignored. // Retry-request is not handling grpc errors correctly, so // we are handling retries in table.ts and disabling retries in // gax to avoid a request getting retried in multiple places. // Re-enable this test after switching back to using the retry // logic in gax + // const retryRequestOptions = { + // noResponseRetries: 0, + // objectMode: true, + // shouldRetryFn: shouldRetryRequest, + // currentRetryAttempt: currentRetryAttempt++, + // }; + mutationBatchesInvoked.push( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reqOpts!.entries!.map(entry => (entry.rowKey as any).asciiSlice()) + ); // assert.deepStrictEqual( // options!.retryRequestOptions, // retryRequestOptions diff --git a/test/common.ts b/test/common.ts deleted file mode 100644 index b979e0225..000000000 --- a/test/common.ts +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2019 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import * as assert from 'assert'; -import {describe, it} from 'mocha'; -import {shouldRetryRequest, decorateStatus} from '../src/decorateStatus'; - -describe('decorateStatus', () => { - it('should accept a basic message', () => { - const message = 'QUACK!'; - const grpcStatus = {code: 1, message}; - const status = decorateStatus(grpcStatus); - assert.strictEqual(status!.message, message); - }); - - it('should parse JSON from the response message', () => { - const message = { - description: { - rubber: '🦆', - }, - }; - const grpcStatus = {code: 1, message: JSON.stringify(message)}; - const status = decorateStatus(grpcStatus); - assert.deepStrictEqual(status!.message, message.description); - }); -}); - -describe('shouldRetryRequest_', () => { - it('should retry on 429, 500, 502, and 503', () => { - const s1 = shouldRetryRequest({code: 429}); - assert.strictEqual(s1, true); - const s2 = shouldRetryRequest({code: 444}); - assert.strictEqual(s2, false); - }); -}); diff --git a/test/index.ts b/test/index.ts index 5b7301347..147a76001 100644 --- a/test/index.ts +++ b/test/index.ts @@ -23,7 +23,6 @@ import * as sn from 'sinon'; import {Cluster} from '../src/cluster.js'; import {Instance} from '../src/instance.js'; import {PassThrough} from 'stream'; -import {shouldRetryRequest} from '../src/decorateStatus.js'; // eslint-disable-next-line @typescript-eslint/no-var-requires const v2 = require('../src/v2'); @@ -895,51 +894,55 @@ describe('Bigtable', () => { }; }); - it('should pass retryRequestOptions', done => { - const expectedRetryRequestOptions = { - currentRetryAttempt: 0, - noResponseRetries: 0, - objectMode: true, - shouldRetryFn: shouldRetryRequest, - }; - - bigtable.api[CONFIG.client] = { - [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { - assert.deepStrictEqual( - options.retryRequestOptions, - expectedRetryRequestOptions - ); - done(); - }, - }; - - const requestStream = bigtable.request(CONFIG); - requestStream.emit('reading'); - }); - - it('should set gaxOpts.retryRequestOptions when gaxOpts undefined', done => { - const expectedRetryRequestOptions = { - currentRetryAttempt: 0, - noResponseRetries: 0, - objectMode: true, - shouldRetryFn: shouldRetryRequest, - }; - - bigtable.api[CONFIG.client] = { - [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { - assert.deepStrictEqual( - options.retryRequestOptions, - expectedRetryRequestOptions - ); - done(); - }, - }; - - const config = Object.assign({}, CONFIG); - delete config.gaxOpts; - const requestStream = bigtable.request(config); - requestStream.emit('reading'); - }); + // TODO: retry request options are currently ignored + // Re-enable after retry logic is fixed in gax / retry-request + // it('should pass retryRequestOptions', done => { + // const expectedRetryRequestOptions = { + // currentRetryAttempt: 0, + // noResponseRetries: 0, + // objectMode: true, + // shouldRetryFn: shouldRetryRequest, + // }; + + // bigtable.api[CONFIG.client] = { + // [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { + // assert.deepStrictEqual( + // options.retryRequestOptions, + // expectedRetryRequestOptions + // ); + // done(); + // }, + // }; + + // const requestStream = bigtable.request(CONFIG); + // requestStream.emit('reading'); + // }); + + // TODO: retry request options are currently ignored + // Re-enable after retry logic is fixed in gax / retry-request + // it('should set gaxOpts.retryRequestOptions when gaxOpts undefined', done => { + // const expectedRetryRequestOptions = { + // currentRetryAttempt: 0, + // noResponseRetries: 0, + // objectMode: true, + // shouldRetryFn: shouldRetryRequest, + // }; + + // bigtable.api[CONFIG.client] = { + // [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { + // assert.deepStrictEqual( + // options.retryRequestOptions, + // expectedRetryRequestOptions + // ); + // done(); + // }, + // }; + + // const config = Object.assign({}, CONFIG); + // delete config.gaxOpts; + // const requestStream = bigtable.request(config); + // requestStream.emit('reading'); + // }); it('should expose an abort function', done => { GAX_STREAM.cancel = done; diff --git a/test/table.ts b/test/table.ts index 525758e8a..f25dac613 100644 --- a/test/table.ts +++ b/test/table.ts @@ -20,7 +20,6 @@ import * as pumpify from 'pumpify'; import * as sinon from 'sinon'; import {PassThrough, Writable, Duplex} from 'stream'; import {ServiceError} from 'google-gax'; -import {DecoratedStatus} from '../src/decorateStatus'; import * as inst from '../src/instance'; import {ChunkTransformer} from '../src/chunktransformer.js'; @@ -28,7 +27,6 @@ import {Family} from '../src/family.js'; import {Mutation} from '../src/mutation.js'; import {Row} from '../src/row.js'; import * as tblTypes from '../src/table'; -import * as ds from '../src/decorateStatus.js'; import {Bigtable} from '../src'; import {EventEmitter} from 'events'; @@ -2457,18 +2455,18 @@ describe('Bigtable/Table', () => { index: 0, status: { code: 1, + message: 'CANCELLED', }, }, { index: 1, status: { - code: 1, + code: 10, + message: 'ABORTED', }, }, ]; - const parsedStatuses = [{} as DecoratedStatus, {} as DecoratedStatus]; - beforeEach(() => { table.bigtable.request = () => { const stream = new PassThrough({objectMode: true}); @@ -2478,32 +2476,31 @@ describe('Bigtable/Table', () => { }); return stream; }; - - let statusCount = 0; - sandbox.stub(ds, 'decorateStatus').callsFake(status => { - assert.strictEqual(status, fakeStatuses[statusCount].status); - return parsedStatuses[statusCount++]; - }); }); it('should return a PartialFailureError', done => { - table.mutate(entries, (err: Error) => { + const newEntries = [ + { + key: 'a', + }, + { + key: 'b', + }, + ]; + table.mutate(newEntries, (err: Error) => { assert.strictEqual(err.name, 'PartialFailureError'); // eslint-disable-next-line @typescript-eslint/no-explicit-any assert.deepStrictEqual((err as any).errors, [ - Object.assign( - { - entry: entries[0], - }, - parsedStatuses[0] - ), - - Object.assign( - { - entry: entries[1], - }, - parsedStatuses[1] - ), + Object.assign({ + entry: newEntries[0], + code: fakeStatuses[0].status.code, + message: fakeStatuses[0].status.message, + }), + Object.assign({ + entry: newEntries[1], + code: fakeStatuses[1].status.code, + message: fakeStatuses[1].status.message, + }), ]); done(); @@ -2578,7 +2575,6 @@ describe('Bigtable/Table', () => { }, ], ]; - sandbox.stub(ds, 'decorateStatus').returns({} as DecoratedStatus); // eslint-disable-next-line @typescript-eslint/no-explicit-any table.bigtable.request = (config: any) => { entryRequests.push(config.reqOpts.entries); @@ -2621,7 +2617,6 @@ describe('Bigtable/Table', () => { entryRequests = []; - sandbox.stub(ds, 'decorateStatus').returns({} as DecoratedStatus); // eslint-disable-next-line @typescript-eslint/no-explicit-any table.bigtable.request = (config: any) => { entryRequests.push(config.reqOpts.entries); From 8b20f0f30599487fb9fe25c1e28e19ad59dd5470 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Thu, 31 Mar 2022 14:43:11 -0400 Subject: [PATCH 16/18] update --- src/index.ts | 5 ----- src/table.ts | 2 ++ 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/index.ts b/src/index.ts index a558b6fae..506c072ea 100644 --- a/src/index.ts +++ b/src/index.ts @@ -841,11 +841,6 @@ export class Bigtable { currentRetryAttempt: 0, noResponseRetries: 0, objectMode: true, - // TODO: re-enable shouldRetryFn when the retry - // logic is fixed in gax / retry-request - shouldRetryFn: (_: any) => { - return false; - }, }, config.retryOpts ); diff --git a/src/table.ts b/src/table.ts index 16a6d79ff..11ea974e8 100644 --- a/src/table.ts +++ b/src/table.ts @@ -974,6 +974,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); } }) .on('data', _ => { + // Reset error count after a successful read so the backoff + // time won't keep increasing when as stream had multiple errors numConsecutiveErrors = 0; }) .on('end', () => { From fa200fdb2bb688033eab1d7cfc78f083ce0cf9c7 Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Thu, 31 Mar 2022 14:48:23 -0400 Subject: [PATCH 17/18] fix --- src/table.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/table.ts b/src/table.ts index 11ea974e8..84db3999a 100644 --- a/src/table.ts +++ b/src/table.ts @@ -956,11 +956,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); userStream.end(); return; } + numConsecutiveErrors++; if ( - numConsecutiveErrors < maxRetries && + numConsecutiveErrors <= maxRetries && RETRYABLE_STATUS_CODES.has(error.code) ) { - numConsecutiveErrors++; const backOffSettings = options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; From f1079aa89300a17ac5e2e78062fd1ae68bd46f2c Mon Sep 17 00:00:00 2001 From: Mattie Fu <mattiefu@google.com> Date: Thu, 31 Mar 2022 17:18:07 -0400 Subject: [PATCH 18/18] correct retry count --- src/table.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/table.ts b/src/table.ts index 84db3999a..2518ff06c 100644 --- a/src/table.ts +++ b/src/table.ts @@ -1538,7 +1538,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const isRetryable = (err: ServiceError | null) => { // Don't retry if there are no more entries or retry attempts - if (pendingEntryIndices.size === 0 || numRequestsMade > maxRetries) { + if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) { return false; } // If the error is empty but there are still outstanding mutations,