From 170d05b1fa6c720d1109506ed3d3feb525c16efe Mon Sep 17 00:00:00 2001 From: Mark Duckworth <1124037+MarkDuckworth@users.noreply.github.com> Date: Fri, 17 Nov 2023 14:06:14 -0700 Subject: [PATCH] fix: update retry policy to not retry streams that have not made progress (#1946) fix: update retry policy to not retry streams with retryable error that have not made progress receiving documents --- dev/src/index.ts | 2 +- dev/src/reference.ts | 122 +++++++++++-------- dev/src/util.ts | 15 +++ dev/test/aggregateQuery.ts | 41 ++++++- dev/test/query.ts | 239 ++++++++++++++++++++++++++++++++++++- 5 files changed, 367 insertions(+), 52 deletions(-) diff --git a/dev/src/index.ts b/dev/src/index.ts index e797669b3..47fe60eb0 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -1637,7 +1637,7 @@ export class Firestore implements firestore.Firestore { function streamReady(): void { if (!streamInitialized) { streamInitialized = true; - logger('Firestore._initializeStream', requestTag, 'Releasing stream'); + logger('Firestore._initializeStream', requestTag, 'Stream ready'); resolve(resultStream); } } diff --git a/dev/src/reference.ts b/dev/src/reference.ts index cad75511a..012634c8f 100644 --- a/dev/src/reference.ts +++ b/dev/src/reference.ts @@ -44,6 +44,7 @@ import {defaultConverter} from './types'; import { autoId, Deferred, + getTotalTimeout, isPermanentRpcError, mapToArray, requestTag, @@ -2569,6 +2570,15 @@ export class Query< return isPermanentRpcError(err, methodName); } + _hasRetryTimedOut(methodName: string, startTime: number): boolean { + const totalTimeout = getTotalTimeout(methodName); + if (totalTimeout === 0) { + return false; + } + + return Date.now() - startTime >= totalTimeout; + } + /** * Internal streaming method that accepts an optional transaction ID. * @@ -2579,6 +2589,7 @@ export class Query< */ _stream(transactionId?: Uint8Array): NodeJS.ReadableStream { const tag = requestTag(); + const startTime = Date.now(); let lastReceivedDocument: QueryDocumentSnapshot< AppModelType, @@ -2638,8 +2649,9 @@ export class Query< let streamActive: Deferred; do { streamActive = new Deferred(); + const methodName = 'runQuery'; backendStream = await this._firestore.requestStream( - 'runQuery', + methodName, /* bidirectional= */ false, request, tag @@ -2656,12 +2668,28 @@ export class Query< 'Query failed with retryable stream error:', err ); - // Enqueue a "no-op" write into the stream and resume the query - // once it is processed. This allows any enqueued results to be - // consumed before resuming the query so that the query resumption - // can start at the correct document. + + // Enqueue a "no-op" write into the stream and wait for it to be + // read by the downstream consumer. This ensures that all enqueued + // results in the stream are consumed, which will give us an accurate + // value for `lastReceivedDocument`. stream.write(NOOP_MESSAGE, () => { - if (lastReceivedDocument) { + if (this._hasRetryTimedOut(methodName, startTime)) { + logger( + 'Query._stream', + tag, + 'Query failed with retryable stream error but the total retry timeout has exceeded.' + ); + stream.destroy(err); + streamActive.resolve(/* active= */ false); + } else if (lastReceivedDocument) { + logger( + 'Query._stream', + tag, + 'Query failed with retryable stream error and progress was made receiving ' + + 'documents, so the stream is being retried.' + ); + // Restart the query but use the last document we received as // the query cursor. Note that we do not use backoff here. The // call to `requestStream()` will backoff should the restart @@ -2673,8 +2701,21 @@ export class Query< } else { request = this.startAfter(lastReceivedDocument).toProto(); } + + // Set lastReceivedDocument to null before each retry attempt to ensure the retry makes progress + lastReceivedDocument = null; + + streamActive.resolve(/* active= */ true); + } else { + logger( + 'Query._stream', + tag, + 'Query failed with retryable stream error however no progress was made receiving ' + + 'documents, so the stream is being closed.' + ); + stream.destroy(err); + streamActive.resolve(/* active= */ false); } - streamActive.resolve(/* active= */ true); }); } else { logger( @@ -3320,48 +3361,33 @@ export class AggregateQuery< // catch below. const request = this.toProto(transactionId); - let streamActive: Deferred; - do { - streamActive = new Deferred(); - const backendStream = await firestore.requestStream( - 'runAggregationQuery', - /* bidirectional= */ false, - request, - tag - ); - stream.on('close', () => { - backendStream.resume(); - backendStream.end(); - }); - backendStream.on('error', err => { - backendStream.unpipe(stream); - // If a non-transactional query failed, attempt to restart. - // Transactional queries are retried via the transaction runner. - if ( - !transactionId && - !isPermanentRpcError(err, 'runAggregationQuery') - ) { - logger( - 'AggregateQuery._stream', - tag, - 'AggregateQuery failed with retryable stream error:', - err - ); - streamActive.resolve(/* active= */ true); - } else { - logger( - 'AggregateQuery._stream', - tag, - 'AggregateQuery failed with stream error:', - err - ); - stream.destroy(err); - streamActive.resolve(/* active= */ false); - } - }); + const backendStream = await firestore.requestStream( + 'runAggregationQuery', + /* bidirectional= */ false, + request, + tag + ); + stream.on('close', () => { backendStream.resume(); - backendStream.pipe(stream); - } while (await streamActive.promise); + backendStream.end(); + }); + backendStream.on('error', err => { + // TODO(group-by) When group-by queries are supported for aggregates + // consider implementing retries if the stream is making progress + // receiving results for groups. See the use of lastReceivedDocument + // in the retry strategy for runQuery. + + backendStream.unpipe(stream); + logger( + 'AggregateQuery._stream', + tag, + 'AggregateQuery failed with stream error:', + err + ); + stream.destroy(err); + }); + backendStream.resume(); + backendStream.pipe(stream); }) .catch(e => stream.destroy(e)); diff --git a/dev/src/util.ts b/dev/src/util.ts index c68695f82..667959402 100644 --- a/dev/src/util.ts +++ b/dev/src/util.ts @@ -178,6 +178,21 @@ export function getRetryCodes(methodName: string): number[] { return getServiceConfig(methodName)?.retry?.retryCodes ?? []; } +/** + * Gets the total timeout in milliseconds from the retry settings in + * the service config for the given RPC. If the total timeout is not + * set, then `0` is returned. + * + * @private + * @internal + */ +export function getTotalTimeout(methodName: string): number { + return ( + getServiceConfig(methodName)?.retry?.backoffSettings?.totalTimeoutMillis ?? + 0 + ); +} + /** * Returns the backoff setting from the service configuration. * @private diff --git a/dev/test/aggregateQuery.ts b/dev/test/aggregateQuery.ts index ff81a254d..b3e9490fb 100644 --- a/dev/test/aggregateQuery.ts +++ b/dev/test/aggregateQuery.ts @@ -126,19 +126,31 @@ describe('aggregate query interface', () => { }); }); - it('handles stream exception at initialization', () => { + it('handles stream exception at initialization', async () => { + let attempts = 0; const query = firestore.collection('collectionId').count(); query._stream = () => { + ++attempts; throw new Error('Expected error'); }; - return expect(query.get()).to.eventually.rejectedWith('Expected error'); + await query + .get() + .then(() => { + throw new Error('Unexpected success in Promise'); + }) + .catch(err => { + expect(err.message).to.equal('Expected error'); + expect(attempts).to.equal(1); + }); }); it('handles stream exception during initialization', async () => { + let attempts = 0; const overrides: ApiOverride = { runAggregationQuery: () => { + ++attempts; return stream(new Error('Expected error')); }, }; @@ -152,6 +164,31 @@ describe('aggregate query interface', () => { }) .catch(err => { expect(err.message).to.equal('Expected error'); + expect(attempts).to.equal(5); + }); + }); + + it('handles message without result during initialization', async () => { + let attempts = 0; + const overrides: ApiOverride = { + runAggregationQuery: () => { + ++attempts; + return stream({readTime: {seconds: 5, nanos: 6}}); + }, + }; + firestore = await createInstance(overrides); + + const query = firestore.collection('collectionId').count(); + await query + .get() + .then(() => { + throw new Error('Unexpected success in Promise'); + }) + .catch(err => { + expect(err.message).to.equal( + 'RunAggregationQueryResponse is missing result' + ); + expect(attempts).to.equal(1); }); }); }); diff --git a/dev/test/query.ts b/dev/test/query.ts index da2c8c079..97dce3453 100644 --- a/dev/test/query.ts +++ b/dev/test/query.ts @@ -50,7 +50,7 @@ import { writeResult, } from './util/helpers'; -import {GoogleError} from 'google-gax'; +import {GoogleError, Status} from 'google-gax'; import api = google.firestore.v1; import protobuf = google.protobuf; import {Filter} from '../src/filter'; @@ -449,6 +449,14 @@ export function result( } } +export function heartbeat(count: number): api.IRunQueryResponse { + return { + document: null, + readTime: {seconds: 5, nanos: 6}, + skippedResults: count, + }; +} + describe('query interface', () => { let firestore: Firestore; @@ -714,9 +722,11 @@ describe('query interface', () => { }); it('handles stream exception at initialization', () => { + let attempts = 0; const query = firestore.collection('collectionId'); query._stream = () => { + ++attempts; throw new Error('Expected error'); }; @@ -727,12 +737,16 @@ describe('query interface', () => { }) .catch(err => { expect(err.message).to.equal('Expected error'); + expect(attempts).to.equal(1); }); }); it('handles stream exception during initialization', () => { + let attempts = 0; + const overrides: ApiOverride = { runQuery: () => { + ++attempts; return stream(new Error('Expected error')); }, }; @@ -747,6 +761,7 @@ describe('query interface', () => { }) .catch(err => { expect(err.message).to.equal('Expected error'); + expect(attempts).to.equal(5); }); }); }); @@ -773,6 +788,228 @@ describe('query interface', () => { }); }); + it('handles stream exception after initialization and heartbeat', () => { + const deadlineExceededErr = new GoogleError(); + deadlineExceededErr.code = Status.DEADLINE_EXCEEDED; + deadlineExceededErr.message = 'DEADLINE_EXCEEDED error message'; + + let attempts = 0; + + const overrides: ApiOverride = { + runQuery: () => { + ++attempts; + + // A heartbeat message will initialize the stream, but it is not + // a document, so it does not represent progress made on the stream. + return stream(heartbeat(1000), deadlineExceededErr); + }, + }; + + return createInstance(overrides).then(firestoreInstance => { + firestore = firestoreInstance; + return firestore + .collection('collectionId') + .get() + .then(() => { + throw new Error('Unexpected success in Promise'); + }) + .catch(err => { + expect(err.message).to.equal('DEADLINE_EXCEEDED error message'); + + // The heartbeat initialized the stream before there was a stream + // exception, so we only expect a single attempt at streaming. + expect(attempts).to.equal(1); + }); + }); + }); + + function handlesRetryableExceptionUntilProgressStops( + withHeartbeat: boolean + ): Promise { + let attempts = 0; + + // count of stream initializations that are successful + // and make progress (a document is received before error) + const initializationsWithProgress = 10; + + // Receiving these error codes on a stream after the stream has received document data + // should result in the stream retrying indefinitely. + const retryableErrorCodes = [ + Status.DEADLINE_EXCEEDED, + Status.UNAVAILABLE, + Status.INTERNAL, + Status.UNAVAILABLE, + ]; + + const overrides: ApiOverride = { + runQuery: x => { + ++attempts; + + // Validate that runQuery is called with cursor of the lastReceivedDocument + // for all attempts except but the first. + if (attempts > 1) { + const docPath = + x?.structuredQuery?.startAt?.values?.[0].referenceValue || ''; + const docId = docPath.substring(docPath.lastIndexOf('/')); + expect(docId).to.equal( + `/id-${Math.min(initializationsWithProgress, attempts - 1)}` + ); + expect(x?.structuredQuery?.orderBy?.length).to.equal(1); + expect(x?.structuredQuery?.orderBy?.[0].field?.fieldPath).to.equal( + '__name__' + ); + } + + const streamElements = []; + + // A heartbeat is a message that may be received on a stream while + // a query is running. If the test is configured `withHeartbeat = true` + // then the fake stream will include a heartbeat before the first + // document. This heartbeat message has the side effect of initializing + // the stream, but it does not represent progress of streaming the results. + // Testing with and without heartbeats allows us to evaluate different + // retry logic in the SDK. + if (withHeartbeat) { + streamElements.push(heartbeat(1000)); + } + + // For the first X initializations, the stream will make progress + // receiving documents in the result set. + // For the X+1 attempt, the stream will not make progress before + // the error. If a stream gets an error without progress, the + // retry policy will close the stream. + if (attempts <= initializationsWithProgress) { + streamElements.push(result(`id-${attempts}`)); + } + + // Create an error with one of the retryable error codes + const googleError = new GoogleError(); + googleError.code = + retryableErrorCodes[attempts % retryableErrorCodes.length]; + googleError.message = 'test error message'; + streamElements.push(googleError); + + return stream(...streamElements); + }, + }; + + return createInstance(overrides).then(firestoreInstance => { + firestore = firestoreInstance; + const query = firestore.collection('collectionId'); + query._hasRetryTimedOut = () => false; + return query + .get() + .then(() => { + throw new Error('Unexpected success in Promise'); + }) + .catch(err => { + expect(err.message).to.equal('test error message'); + + // Assert that runQuery was retried the expected number + // of times based on the test configuration. + // + // If the test is running with heartbeat messages, then + // the test will always validate retry logic for an + // initialized stream. + // + // If the test is running without heartbeat messages, + // then the test will validate retry logic for both + // initialized and uninitialized streams. Specifically, + // the last retry will fail with an uninitialized stream. + const initilizationRetries = withHeartbeat ? 1 : 5; + expect(attempts).to.equal( + initializationsWithProgress + initilizationRetries + ); + }); + }); + } + + it('handles retryable exception until progress stops with heartbeat', async () => { + await handlesRetryableExceptionUntilProgressStops(true); + }); + + it('handles retryable exception until progress stops without heartbeat', async () => { + await handlesRetryableExceptionUntilProgressStops(false); + }); + + it('handles retryable exception with progress until timeout', async () => { + let attempts = 0; + + const overrides: ApiOverride = { + runQuery: () => { + ++attempts; + + const streamElements = []; + streamElements.push(result(`id-${attempts}`)); + + // Create an error with a retryable error code + const googleError = new GoogleError(); + googleError.code = Status.DEADLINE_EXCEEDED; + googleError.message = 'test error message'; + streamElements.push(googleError); + + return stream(...streamElements); + }, + }; + + return createInstance(overrides).then(firestoreInstance => { + firestore = firestoreInstance; + const query = firestore.collection('collectionId'); + // Fake our timeout check to fail after 10 retry attempts + query._hasRetryTimedOut = (methodName, startTime) => { + expect(methodName).to.equal('runQuery'); + expect(startTime).to.be.lessThanOrEqual(Date.now()); + return attempts >= 10; + }; + + return query + .get() + .then(() => { + throw new Error('Unexpected success in Promise'); + }) + .catch(err => { + expect(err.message).to.equal('test error message'); + + expect(attempts).to.equal(10); + }); + }); + }); + + it('handles non-retryable after recieving data (with get())', () => { + let attempts = 0; + + const overrides: ApiOverride = { + runQuery: () => { + ++attempts; + + const streamElements = []; + streamElements.push(result(`id-${attempts}`)); + + // Create an error with one of the retryable error codes + const googleError = new GoogleError(); + googleError.code = Status.UNKNOWN; + googleError.message = 'test error message'; + streamElements.push(googleError); + + return stream(...streamElements); + }, + }; + + return createInstance(overrides).then(firestoreInstance => { + firestore = firestoreInstance; + return firestore + .collection('collectionId') + .get() + .then(() => { + throw new Error('Unexpected success in Promise'); + }) + .catch(err => { + expect(err.message).to.equal('test error message'); + expect(attempts).to.equal(1); + }); + }); + }); + it('handles stream exception after initialization (with stream())', done => { const responses = [ () => stream(result('first'), new Error('Expected error')),