Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: update retry policy to not retry streams that have not made progress receiving documents #1946

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ export class Firestore implements firestore.Firestore {
function streamReady(): void {
if (!streamInitialized) {
streamInitialized = true;
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
logger('Firestore._initializeStream', requestTag, 'Stream ready');
resolve(resultStream);
}
}
Expand Down
56 changes: 41 additions & 15 deletions dev/src/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2637,6 +2637,9 @@ export class Query<

let streamActive: Deferred<boolean>;
do {
// Set lastReceivedDocument to null before each retry attempt to ensure the retry makes progress
lastReceivedDocument = null;

streamActive = new Deferred<boolean>();
backendStream = await this._firestore.requestStream(
'runQuery',
Expand All @@ -2650,18 +2653,30 @@ export class Query<
// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (!transactionId && !this._isPermanentRpcError(err, 'runQuery')) {
logger(
'Query._stream',
tag,
'Query failed with retryable stream error:',
err
);
// Enqueue a "no-op" write into the stream and resume the query
// once it is processed. This allows any enqueued results to be
// consumed before resuming the query so that the query resumption
// can start at the correct document.
stream.write(NOOP_MESSAGE, () => {
if (lastReceivedDocument) {
// Only streams that have received documents should be
// restarted to avoid rerunning long-running or high-cost
// queries that are not progressing to completion.
if (lastReceivedDocument) {
logger(
'Query._stream',
tag,
'Query failed with retryable stream error:',
err
);
// Enqueue a "no-op" write into the stream and resume the query
// once it is processed. This allows any enqueued results to be
// consumed before resuming the query so that the query resumption
// can start at the correct document.
stream.write(NOOP_MESSAGE, () => {
// `backendStream` was unpiped at the beginning of the handler
// for `backendStream.on('error', ...)`, so we don't expect
// `lastReceivedDocument` to be updated during the noop write,
// but a check here is included for this unexpected state.
assert(
lastReceivedDocument !== null,
'lastReceivedDocument is unset'
);

// Restart the query but use the last document we received as
// the query cursor. Note that we do not use backoff here. The
// call to `requestStream()` will backoff should the restart
Expand All @@ -2673,9 +2688,20 @@ export class Query<
} else {
request = this.startAfter(lastReceivedDocument).toProto();
}
}
streamActive.resolve(/* active= */ true);
});

streamActive.resolve(/* active= */ true);
});
} else {
logger(
'Query._stream',
tag,
'Query failed with retryable stream error however no progress was made receiving ' +
'documents, so the stream is being closed. Stream error:',
err
);
stream.destroy(err);
streamActive.resolve(/* active= */ false);
}
} else {
logger(
'Query._stream',
Expand Down
174 changes: 173 additions & 1 deletion dev/test/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import {
writeResult,
} from './util/helpers';

import {GoogleError} from 'google-gax';
import {GoogleError, Status} from 'google-gax';
import api = google.firestore.v1;
import protobuf = google.protobuf;
import {Filter} from '../src/filter';
Expand Down Expand Up @@ -449,6 +449,14 @@ export function result(
}
}

export function heartbeat(count: number): api.IRunQueryResponse {
return {
document: null,
readTime: {seconds: 5, nanos: 6},
skippedResults: count,
};
}

describe('query interface', () => {
let firestore: Firestore;

Expand Down Expand Up @@ -714,9 +722,11 @@ describe('query interface', () => {
});

it('handles stream exception at initialization', () => {
let attempts = 0;
const query = firestore.collection('collectionId');

query._stream = () => {
++attempts;
throw new Error('Expected error');
};

Expand All @@ -727,12 +737,16 @@ describe('query interface', () => {
})
.catch(err => {
expect(err.message).to.equal('Expected error');
expect(attempts).to.equal(1);
});
});

it('handles stream exception during initialization', () => {
let attempts = 0;

const overrides: ApiOverride = {
runQuery: () => {
++attempts;
return stream(new Error('Expected error'));
},
};
Expand All @@ -747,6 +761,7 @@ describe('query interface', () => {
})
.catch(err => {
expect(err.message).to.equal('Expected error');
expect(attempts).to.equal(5);
});
});
});
Expand All @@ -773,6 +788,163 @@ describe('query interface', () => {
});
});

it('handles stream exception after initialization and heartbeat', () => {
const deadlineExceededErr = new GoogleError();
deadlineExceededErr.code = Status.DEADLINE_EXCEEDED;
deadlineExceededErr.message = 'DEADLINE_EXCEEDED error message';

let attempts = 0;

const overrides: ApiOverride = {
runQuery: () => {
++attempts;

// A heartbeat message will initialize the stream, but it is not
// a document, so it does not represent progress made on the stream.
return stream(heartbeat(1000), deadlineExceededErr);
},
};

return createInstance(overrides).then(firestoreInstance => {
firestore = firestoreInstance;
return firestore
.collection('collectionId')
.get()
.then(() => {
throw new Error('Unexpected success in Promise');
})
.catch(err => {
expect(err.message).to.equal('DEADLINE_EXCEEDED error message');

// The heartbeat initialized the stream before there was a stream
// exception, so we only expect a single attempt at streaming.
expect(attempts).to.equal(1);
});
});
});

function handlesRetryableExceptionUntilProgressStops(withHeartbeat: boolean) {
let attempts = 0;

// count of stream initializations that are successful
// and make progress (a document is received before error)
const initializationsWithProgress = 10;

// Receiving these error codes on a stream after the stream has received document data
// should result in the stream retrying indefinitely.
const retryableErrorCodes = [
Status.DEADLINE_EXCEEDED,
Status.UNAVAILABLE,
Status.INTERNAL,
Status.UNAVAILABLE,
];

const overrides: ApiOverride = {
runQuery: x => {
++attempts;

// Validate that runQuery is called with cursor of the lastReceivedDocument
// for all attempts except but the first.
if (attempts > 1) {
const docPath =
x?.structuredQuery?.startAt?.values?.[0].referenceValue || '';
const docId = docPath.substring(docPath.lastIndexOf('/'));
expect(docId).to.equal(
`/id-${Math.min(initializationsWithProgress, attempts - 1)}`
);
expect(x?.structuredQuery?.orderBy?.length).to.equal(1);
expect(x?.structuredQuery?.orderBy?.[0].field?.fieldPath).to.equal(
'__name__'
);
}

const streamElements = [];

streamElements.push(heartbeat(1000));

// For the first X initializations, the stream will see progress.
// For the X+1 attempt, the stream will not see progress before
// the error, so we expect the stream to close.
if (attempts <= initializationsWithProgress) {
streamElements.push(result(`id-${attempts}`));
}

// Create an error with one of the retryable error codes
const googleError = new GoogleError();
googleError.code =
retryableErrorCodes[attempts % retryableErrorCodes.length];
googleError.message = 'test error message';
streamElements.push(googleError);

return stream(...streamElements);
},
};

return createInstance(overrides).then(firestoreInstance => {
firestore = firestoreInstance;
return firestore
.collection('collectionId')
.get()
.then(() => {
throw new Error('Unexpected success in Promise');
})
.catch(err => {
expect(err.message).to.equal('test error message');

// 20 stream initialization with data that progresses the steam
// followed by an error. The final X+1 initialization does not get data
// before the error, so we may also get retries during stream initialization.
const initilizationRetries = withHeartbeat ? 1 : 5;
expect(attempts).to.equal(
initializationsWithProgress + initilizationRetries
);
});
});
}

it('handles retryable exception until progress stops with heartbeat', () => {
handlesRetryableExceptionUntilProgressStops(true);
});

it('handles retryable exception until progress stops without heartbeat', () => {
handlesRetryableExceptionUntilProgressStops(false);
});

it('handles non-retryable after recieving data (with get())', () => {
let attempts = 0;

const overrides: ApiOverride = {
runQuery: () => {
++attempts;

const streamElements = [];
streamElements.push(result(`id-${attempts}`));

// Create an error with one of the retryable error codes
const googleError = new GoogleError();
googleError.code = Status.UNKNOWN;
googleError.message = 'test error message';
streamElements.push(googleError);

return stream(...streamElements);
},
};

return createInstance(overrides).then(firestoreInstance => {
firestore = firestoreInstance;
return firestore
.collection('collectionId')
.get()
.then(() => {
throw new Error('Unexpected success in Promise');
})
.catch(err => {
expect(err.message).to.equal('test error message');
expect(attempts).to.equal(1);
});
});
});

it('handles stream exception after initialization (with stream())', done => {
const responses = [
() => stream(result('first'), new Error('Expected error')),
Expand Down