Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: lower batch size on BulkWriter retry #1549

Merged
merged 2 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ import api = google.firestore.v1;
*/
const MAX_BATCH_SIZE = 20;

/*!
* The maximum number of writes can be can in a single batch that is being retried.
*/
export const RETRY_MAX_BATCH_SIZE = 10;

/*!
* The starting maximum number of operations per second as allowed by the
* 500/50/5 rule.
Expand Down Expand Up @@ -213,6 +218,13 @@ class BulkCommitBatch extends WriteBatch {
// been resolved.
readonly pendingOps: Array<BulkWriterOperation> = [];

readonly maxBatchSize: number;

constructor(firestore: Firestore, maxBatchSize: number) {
super(firestore);
this.maxBatchSize = maxBatchSize;
}

has(documentRef: firestore.DocumentReference<unknown>): boolean {
return this.docPaths.has(documentRef.path);
}
Expand Down Expand Up @@ -333,14 +345,17 @@ export class BulkWriter {
* Visible for testing.
* @private
*/
_maxBatchSize = MAX_BATCH_SIZE;
private _maxBatchSize = MAX_BATCH_SIZE;

/**
* The batch that is currently used to schedule operations. Once this batch
* reaches maximum capacity, a new batch is created.
* @private
*/
private _bulkCommitBatch = new BulkCommitBatch(this.firestore);
private _bulkCommitBatch = new BulkCommitBatch(
this.firestore,
this._maxBatchSize
);

/**
* A pointer to the tail of all active BulkWriter operations. This pointer
Expand Down Expand Up @@ -384,6 +399,16 @@ export class BulkWriter {
return this._bufferedOperations.length;
}

// Visible for testing.
_setMaxBatchSize(size: number): void {
assert(
this._bulkCommitBatch.pendingOps.length === 0,
'BulkCommitBatch should be empty'
);
this._maxBatchSize = size;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore, size);
}

/**
* The maximum number of pending operations that can be enqueued onto this
* BulkWriter instance. Once the this number of writes have been enqueued,
Expand Down Expand Up @@ -840,7 +865,6 @@ export class BulkWriter {
if (this._bulkCommitBatch._opCount === 0) return;

const pendingBatch = this._bulkCommitBatch;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

// Use the write with the longest backoff duration when determining backoff.
const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) =>
Expand All @@ -849,6 +873,13 @@ export class BulkWriter {
const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
const delayedExecution = new Deferred<void>();

// A backoff duration greater than 0 implies that this batch is a retry.
// Retried writes are sent with a batch size of 10 in order to guarantee
// that the batch is under the 10MiB limit.
const maxBatchSize =
highestBackoffDuration > 0 ? RETRY_MAX_BATCH_SIZE : this._maxBatchSize;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore, maxBatchSize);

if (backoffMsWithJitter > 0) {
delayExecution(() => delayedExecution.resolve(), backoffMsWithJitter);
} else {
Expand Down Expand Up @@ -988,7 +1019,7 @@ export class BulkWriter {
enqueueOnBatchCallback(this._bulkCommitBatch);
this._bulkCommitBatch.processLastOperation(op);

if (this._bulkCommitBatch._opCount === this._maxBatchSize) {
if (this._bulkCommitBatch._opCount === this._bulkCommitBatch.maxBatchSize) {
this._scheduleCurrentBatch();
} else if (op.flushed) {
// If flush() was called before this operation was enqueued into a batch,
Expand Down
47 changes: 44 additions & 3 deletions dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT,
DEFAULT_JITTER_FACTOR,
DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT,
RETRY_MAX_BATCH_SIZE,
} from '../src/bulk-writer';
import {
ApiOverride,
Expand Down Expand Up @@ -576,7 +577,7 @@ describe('BulkWriter', () => {
},
]);
bulkWriter._setMaxPendingOpCount(6);
bulkWriter._maxBatchSize = 3;
bulkWriter._setMaxBatchSize(3);
bulkWriter
.set(firestore.doc('collectionId/doc1'), {foo: 'bar'})
.then(incrementOpCount);
Expand Down Expand Up @@ -822,6 +823,46 @@ describe('BulkWriter', () => {
expect(timeoutHandlerCounter).to.equal(3);
});

it('retries with smaller batch size', async () => {
const nLengthArray = (n: number): number[] => Array.from(Array(n).keys());

const bulkWriter = await instantiateInstance([
{
request: createRequest(
nLengthArray(15).map((_, i) => setOp('doc' + i, 'bar'))
),
response: mergeResponses(
nLengthArray(15).map(() => failedResponse(Status.ABORTED))
),
},
{
request: createRequest(
nLengthArray(RETRY_MAX_BATCH_SIZE).map((_, i) =>
setOp('doc' + i, 'bar')
)
),
response: mergeResponses(
nLengthArray(RETRY_MAX_BATCH_SIZE).map(() => successResponse(1))
),
},
{
request: createRequest(
nLengthArray(15 - RETRY_MAX_BATCH_SIZE).map((_, i) =>
setOp('doc' + i + RETRY_MAX_BATCH_SIZE, 'bar')
)
),
response: mergeResponses(
nLengthArray(15 - RETRY_MAX_BATCH_SIZE).map(() => successResponse(1))
),
},
]);
for (let i = 0; i < 15; i++) {
bulkWriter.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'});
}

await bulkWriter.close();
});

it('retries maintain correct write resolution ordering', async () => {
const bulkWriter = await instantiateInstance([
{
Expand Down Expand Up @@ -910,7 +951,7 @@ describe('BulkWriter', () => {
},
]);

bulkWriter._maxBatchSize = 2;
bulkWriter._setMaxBatchSize(2);
for (let i = 0; i < 6; i++) {
bulkWriter
.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'})
Expand Down Expand Up @@ -942,7 +983,7 @@ describe('BulkWriter', () => {
},
]);

bulkWriter._maxBatchSize = 3;
bulkWriter._setMaxBatchSize(3);
const promise1 = bulkWriter
.set(firestore.doc('collectionId/doc1'), {foo: 'bar'})
.then(incrementOpCount);
Expand Down
2 changes: 1 addition & 1 deletion dev/test/recursive-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ describe('recursiveDelete() method:', () => {
const firestore = await createInstance(overrides);

const bulkWriter = firestore.bulkWriter();
bulkWriter._maxBatchSize = maxBatchSize;
bulkWriter._setMaxBatchSize(maxBatchSize);
await firestore._recursiveDelete(
firestore.collection('root'),
maxPendingOps,
Expand Down