Skip to content

Commit

Permalink
feat: retry BatchGetDocuments RPCs that fail with errors
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jun 24, 2021
1 parent 2406f6a commit a04a119
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 148 deletions.
193 changes: 193 additions & 0 deletions dev/src/document-reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*!
* Copyright 2021 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {DocumentSnapshot, DocumentSnapshotBuilder} from './document';
import {DocumentReference} from './reference';
import {FieldPath} from './path';
import {isPermanentRpcError} from './util';
import {google} from '../protos/firestore_v1_proto_api';
import {logger} from './logger';
import {Firestore} from './index';

import api = google.firestore.v1;

/**
* A wrapper around BatchGetDocumentsRequest that retries request upon stream
* failure and returns ordered results.
*
* @private
*/
export class DocumentReader<T> {
/** An optional field mask to apply to this read. */
fieldMask?: FieldPath[];
/** An optional transaction ID to use for this read. */
transactionId?: Uint8Array;

private remainingDocuments = new Set<string>();
private retrievedDocuments = new Map<string, DocumentSnapshot>();

/**
* Internal method to retrieve multiple documents from Firestore, optionally
* as part of a transaction.
*
* @param firestore The Firestore instance to use.
* @param allDocuments The documents to receive.
* @returns A Promise that contains an array with the resulting documents.
*/
constructor(
private firestore: Firestore,
private allDocuments: Array<DocumentReference<T>>
) {
for (const docRef of this.allDocuments) {
this.remainingDocuments.add(docRef.formattedName);
}
}

/**
* Invokes the BatchGetDocuments RPC and returns the results.
*
* @param requestTag A unique client-assigned identifier for this request.
*/
async get(requestTag: string): Promise<Array<DocumentSnapshot<T>>> {
await this.fetchAllDocuments(requestTag);

// BatchGetDocuments doesn't preserve document order. We use the request
// order to sort the resulting documents.
const orderedDocuments: Array<DocumentSnapshot<T>> = [];

for (const docRef of this.allDocuments) {
const document = this.retrievedDocuments.get(docRef.formattedName);
if (document !== undefined) {
// Recreate the DocumentSnapshot with the DocumentReference
// containing the original converter.
const finalDoc = new DocumentSnapshotBuilder(
docRef as DocumentReference<T>
);
finalDoc.fieldsProto = document._fieldsProto;
finalDoc.readTime = document.readTime;
finalDoc.createTime = document.createTime;
finalDoc.updateTime = document.updateTime;
orderedDocuments.push(finalDoc.build());
} else {
throw new Error(`Did not receive document for "${docRef.path}".`);
}
}

return orderedDocuments;
}

private async fetchAllDocuments(requestTag: string): Promise<void> {
while (this.remainingDocuments.size > 0) {
try {
return await this.fetchMoreDocuments(requestTag);
} catch (err) {
// If a non-transactional read failed, attempt to restart.
// Transactional reads are retried via the transaction runner.
if (
this.transactionId ||
isPermanentRpcError(err, 'batchGetDocuments')
) {
logger(
'DocumentReader.fetchAllDocuments',
requestTag,
'BatchGetDocuments failed with non-retryable stream error:',
err
);
throw err;
} else {
logger(
'DocumentReader.fetchAllDocuments',
requestTag,
'BatchGetDocuments failed with retryable stream error:',
err
);
}
}
}
}

private fetchMoreDocuments(requestTag: string): Promise<void> {
const request: api.IBatchGetDocumentsRequest = {
database: this.firestore.formattedName,
transaction: this.transactionId,
documents: Array.from(this.remainingDocuments),
};

if (this.fieldMask) {
const fieldPaths = this.fieldMask.map(
fieldPath => (fieldPath as FieldPath).formattedName
);
request.mask = {fieldPaths};
}

let resultCount = 0;

return this.firestore
.requestStream('batchGetDocuments', request, requestTag)
.then(stream => {
return new Promise<void>((resolve, reject) => {
stream
.on('error', err => reject(err))
.on('data', (response: api.IBatchGetDocumentsResponse) => {
try {
let document;

if (response.found) {
logger(
'DocumentReader.fetchMoreDocuments',
requestTag,
'Received document: %s',
response.found.name!
);
document = this.firestore.snapshot_(
response.found,
response.readTime!
);
} else {
logger(
'DocumentReader.fetchMoreDocuments',
requestTag,
'Document missing: %s',
response.missing!
);
document = this.firestore.snapshot_(
response.missing!,
response.readTime!
);
}

const path = document.ref.formattedName;
this.remainingDocuments.delete(path);
this.retrievedDocuments.set(path, document);
++resultCount;
} catch (err) {
reject(err);
}
})
.on('end', () => {
logger(
'DocumentReader.fetchMoreDocuments',
requestTag,
'Received %d results',
resultCount
);
resolve();
});
stream.resume();
});
});
}
}
137 changes: 7 additions & 130 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {ExponentialBackoff, ExponentialBackoffSetting} from './backoff';
import {BulkWriter} from './bulk-writer';
import {BundleBuilder} from './bundle';
import {fieldsFromJson, timestampFromJson} from './convert';
import {DocumentReader} from './document-reader';
import {
DocumentSnapshot,
DocumentSnapshotBuilder,
Expand All @@ -34,14 +35,12 @@ import {
import {logger, setLibVersion} from './logger';
import {
DEFAULT_DATABASE_ID,
FieldPath,
QualifiedResourcePath,
ResourcePath,
validateResourcePath,
} from './path';
import {ClientPool} from './pool';
import {CollectionReference} from './reference';
import {DocumentReference} from './reference';
import {CollectionReference, DocumentReference} from './reference';
import {Serializer} from './serializer';
import {Timestamp} from './timestamp';
import {parseGetAllArguments, Transaction} from './transaction';
Expand Down Expand Up @@ -1077,138 +1076,16 @@ export class Firestore implements firestore.Firestore {
const stack = Error().stack!;

return this.initializeIfNeeded(tag)
.then(() => this.getAll_(documents, fieldMask, tag))
.then(() => {
const reader = new DocumentReader(this, documents);
reader.fieldMask = fieldMask || undefined;
return reader.get(tag);
})
.catch(err => {
throw wrapError(err, stack);
});
}

/**
* Internal method to retrieve multiple documents from Firestore, optionally
* as part of a transaction.
*
* @private
* @param docRefs The documents to receive.
* @param fieldMask An optional field mask to apply to this read.
* @param requestTag A unique client-assigned identifier for this request.
* @param transactionId The transaction ID to use for this read.
* @returns A Promise that contains an array with the resulting documents.
*/
getAll_<T>(
docRefs: Array<firestore.DocumentReference<T>>,
fieldMask: firestore.FieldPath[] | null,
requestTag: string,
transactionId?: Uint8Array
): Promise<Array<DocumentSnapshot<T>>> {
const requestedDocuments = new Set<string>();
const retrievedDocuments = new Map<string, DocumentSnapshot>();

for (const docRef of docRefs) {
requestedDocuments.add((docRef as DocumentReference<T>).formattedName);
}

const request: api.IBatchGetDocumentsRequest = {
database: this.formattedName,
transaction: transactionId,
documents: Array.from(requestedDocuments),
};

if (fieldMask) {
const fieldPaths = fieldMask.map(
fieldPath => (fieldPath as FieldPath).formattedName
);
request.mask = {fieldPaths};
}

return this.requestStream('batchGetDocuments', request, requestTag).then(
stream => {
return new Promise<Array<DocumentSnapshot<T>>>((resolve, reject) => {
stream
.on('error', err => {
logger(
'Firestore.getAll_',
requestTag,
'GetAll failed with error:',
err
);
reject(err);
})
.on('data', (response: api.IBatchGetDocumentsResponse) => {
try {
let document;

if (response.found) {
logger(
'Firestore.getAll_',
requestTag,
'Received document: %s',
response.found.name!
);
document = this.snapshot_(response.found, response.readTime!);
} else {
logger(
'Firestore.getAll_',
requestTag,
'Document missing: %s',
response.missing!
);
document = this.snapshot_(
response.missing!,
response.readTime!
);
}

const path = document.ref.path;
retrievedDocuments.set(path, document);
} catch (err) {
logger(
'Firestore.getAll_',
requestTag,
'GetAll failed with exception:',
err
);
reject(err);
}
})
.on('end', () => {
logger(
'Firestore.getAll_',
requestTag,
'Received %d results',
retrievedDocuments.size
);

// BatchGetDocuments doesn't preserve document order. We use
// the request order to sort the resulting documents.
const orderedDocuments: Array<DocumentSnapshot<T>> = [];

for (const docRef of docRefs) {
const document = retrievedDocuments.get(docRef.path);
if (document !== undefined) {
// Recreate the DocumentSnapshot with the DocumentReference
// containing the original converter.
const finalDoc = new DocumentSnapshotBuilder(
docRef as DocumentReference<T>
);
finalDoc.fieldsProto = document._fieldsProto;
finalDoc.readTime = document.readTime;
finalDoc.createTime = document.createTime;
finalDoc.updateTime = document.updateTime;
orderedDocuments.push(finalDoc.build());
} else {
reject(
new Error(`Did not receive document for "${docRef.path}".`)
);
}
}
resolve(orderedDocuments);
});
stream.resume();
});
}
);
}

/**
* Registers a listener on this client, incrementing the listener count. This
* is used to verify that all listeners are unsubscribed when terminate() is
Expand Down
Loading

0 comments on commit a04a119

Please # to comment.