Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: disable retry-request, add exponential backoff in mutateRows and readRows #1060

Merged
merged 18 commits into from
Mar 31, 2022
25 changes: 19 additions & 6 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {Duplex} from 'stream';
// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]);
const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set([4, 14]);
// (1=CANCELLED)
const IGNORED_STATUS_CODES = new Set([1]);

Expand Down Expand Up @@ -1507,13 +1508,26 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const onBatchResponse = (
err: ServiceError | PartialFailureError | null
) => {
// TODO: enable retries when the entire RPC fails
if (err) {
// The error happened before a request was even made, don't retry.
// Retry RPC level errors
if (!(err instanceof PartialFailureError)) {
const serviceError = err as ServiceError;
if (
numRequestsMade <= maxRetries &&
IDEMPOTENT_RETRYABLE_STATUS_CODES.has(serviceError.code)
) {
console.log('RETRYING ' + err.code);
makeNextBatchRequest();
return;
}
}
callback(err);
return;
}
if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) {
} else if (
pendingEntryIndices.size !== 0 &&
numRequestsMade <= maxRetries
) {
console.log('RETRYING partial');
makeNextBatchRequest();
return;
}
Expand Down Expand Up @@ -1552,8 +1566,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
retryOpts,
})
.on('error', (err: ServiceError) => {
// TODO: this check doesn't actually do anything, onBatchResponse
// currently doesn't retry RPC errors, only entry failures
// The error happened before a request was even made, don't retry.
if (numRequestsMade === 0) {
callback(err); // Likely a "projectId not detected" error.
return;
Expand Down
82 changes: 82 additions & 0 deletions test/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2609,6 +2609,88 @@ describe('Bigtable/Table', () => {
});
});
});

describe('rpc level retries', () => {
let emitters: EventEmitter[] | null; // = [((stream: Writable) => { stream.push([{ key: 'a' }]);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let entryRequests: any;

beforeEach(() => {
emitters = null; // This needs to be assigned in each test case.

entryRequests = [];

sandbox.stub(ds, 'decorateStatus').returns({} as DecoratedStatus);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
table.bigtable.request = (config: any) => {
entryRequests.push(config.reqOpts.entries);
const stream = new PassThrough({
objectMode: true,
});

setImmediate(() => {
(emitters!.shift() as any)(stream);
});

return stream;
};
});

it('should not retry unretriable errors', done => {
const unretriableError = new Error('not retryable') as ServiceError;
unretriableError.code = 10; // Aborted
emitters = [
((stream: Writable) => {
stream.emit('error', unretriableError);
}) as {} as EventEmitter,
];
table.maxRetries = 1;
table.mutate(entries, () => {
assert.strictEqual(entryRequests.length, 1);
done();
});
});

it('should retry retryable errors', done => {
const error = new Error('retryable') as ServiceError;
error.code = 14; // Unavailable
emitters = [
((stream: Writable) => {
stream.emit('error', error);
}) as {} as EventEmitter,
((stream: Writable) => {
stream.end();
}) as {} as EventEmitter,
];
table.maxRetries = 1;
table.mutate(entries, () => {
assert.strictEqual(entryRequests.length, 2);
done();
});
});

it('should not retry more than maxRetries times', done => {
const error = new Error('retryable') as ServiceError;
error.code = 14; // Unavailable
emitters = [
((stream: Writable) => {
stream.emit('error', error);
}) as {} as EventEmitter,
((stream: Writable) => {
stream.emit('error', error);
}) as {} as EventEmitter,
((stream: Writable) => {
stream.end();
}) as {} as EventEmitter,
];
table.maxRetries = 1;
table.mutate(entries, () => {
assert.strictEqual(entryRequests.length, 2);
done();
});
});
});
});

describe('row', () => {
Expand Down