Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: add retry on ABORTED errors #286

Merged
merged 14 commits into from
Aug 7, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,37 @@
package com.google.cloud.firestore;

import com.google.cloud.Timestamp;
import io.grpc.Status;
import javax.annotation.Nullable;

/**
* A BatchWriteResult wraps the write time and status returned by Firestore when making
* BatchWriteRequests.
*/
public final class BatchWriteResult {
private final DocumentReference documentReference;
@Nullable private final Timestamp writeTime;
private final Status status;
private final String message;
@Nullable private final Exception exception;

BatchWriteResult(@Nullable Timestamp timestamp, Status status, String message) {
BatchWriteResult(
DocumentReference documentReference,
@Nullable Timestamp timestamp,
@Nullable Exception exception) {
this.documentReference = documentReference;
this.writeTime = timestamp;
this.status = status;
this.message = message;
this.exception = exception;
}

public DocumentReference getDocumentReference() {
return documentReference;
}

@Nullable
public Timestamp getWriteTime() {
return writeTime;
}

public Status getStatus() {
return status;
}

public String getMessage() {
return message;
@Nullable
public Exception getException() {
return exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.CurrentMillisClock;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.cloud.firestore.UpdateBuilder.BatchState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -28,6 +32,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -41,6 +48,29 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
super(firestore, maxBatchSize);
}

BulkCommitBatch(
FirestoreImpl firestore,
BulkCommitBatch retryBatch,
final Set<DocumentReference> docsToRetry) {
super(firestore);
this.writes.addAll(
FluentIterable.from(retryBatch.writes)
.filter(
new Predicate<WriteOperation>() {
@Override
public boolean apply(WriteOperation writeOperation) {
return docsToRetry.contains(writeOperation.documentReference);
}
})
.toList());

Preconditions.checkState(
retryBatch.state == BatchState.SENT,
"Batch should be SENT when creating a new BulkCommitBatch for retry");
this.state = retryBatch.state;
this.pendingOperations = retryBatch.pendingOperations;
}

ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) {
return result;
}
Expand All @@ -55,6 +85,8 @@ public class BulkWriter {
/** The maximum number of writes that can be in a single batch. */
public static final int MAX_BATCH_SIZE = 500;

public static final int MAX_RETRY_ATTEMPTS = 10;

/**
* The starting maximum number of operations per second as allowed by the 500/50/5 rule.
*
Expand Down Expand Up @@ -85,8 +117,12 @@ public class BulkWriter {
/** The maximum number of writes that can be in a single batch. */
private int maxBatchSize = MAX_BATCH_SIZE;

/** A queue of batches to be written. */
private final List<BulkCommitBatch> batchQueue = new ArrayList<>();
/**
* A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent
* modification errors (as this list is modified from both the user thread and the network
* thread).
*/
private final List<BulkCommitBatch> batchQueue = new CopyOnWriteArrayList<>();

/** Whether this BulkWriter instance is closed. Once closed, it cannot be opened again. */
private boolean closed = false;
Expand All @@ -96,8 +132,19 @@ public class BulkWriter {

private final FirestoreImpl firestore;

private final ScheduledExecutorService firestoreExecutor;

private final ExponentialRetryAlgorithm backoff;
private TimedAttemptSettings nextAttempt;

BulkWriter(FirestoreImpl firestore, boolean enableThrottling) {
this.firestore = firestore;
this.backoff =
new ExponentialRetryAlgorithm(
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
this.nextAttempt = backoff.createFirstAttempt();
this.firestoreExecutor = firestore.getClient().getExecutor();

if (enableThrottling) {
rateLimiter =
new RateLimiter(
Expand Down Expand Up @@ -444,12 +491,13 @@ public ApiFuture<WriteResult> update(
public ApiFuture<Void> flush() {
verifyNotClosed();
final SettableApiFuture<Void> flushComplete = SettableApiFuture.create();
List<ApiFuture<Void>> writeFutures = new ArrayList<>();
List<SettableApiFuture<WriteResult>> writeFutures = new ArrayList<>();
for (BulkCommitBatch batch : batchQueue) {
writeFutures.add(batch.awaitBulkCommit());
batch.markReadyToSend();
writeFutures.addAll(batch.getPendingFutures());
}
sendReadyBatches();
ApiFutures.allAsList(writeFutures)
ApiFutures.successfulAsList(writeFutures)
.addListener(
new Runnable() {
public void run() {
Expand Down Expand Up @@ -493,7 +541,7 @@ private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
if (batchQueue.size() > 0) {
BulkCommitBatch lastBatch = batchQueue.get(batchQueue.size() - 1);
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN
&& !lastBatch.getDocuments().contains(documentReference)) {
&& !lastBatch.hasDocument(documentReference)) {
return lastBatch;
}
}
Expand Down Expand Up @@ -539,23 +587,20 @@ public boolean apply(BulkCommitBatch batch) {

// Send the batch if it is under the rate limit, or schedule another attempt after the
// appropriate timeout.
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getOperationCount());
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getPendingOperationCount());
Preconditions.checkState(delayMs != -1, "Batch size should be under capacity");
if (delayMs == 0) {
sendBatch(batch);
} else {
firestore
.getClient()
.getExecutor()
.schedule(
new Runnable() {
@Override
public void run() {
sendBatch(batch);
}
},
delayMs,
TimeUnit.MILLISECONDS);
firestoreExecutor.schedule(
new Runnable() {
@Override
public void run() {
sendBatch(batch);
}
},
delayMs,
TimeUnit.MILLISECONDS);
break;
}

Expand All @@ -568,27 +613,110 @@ public void run() {
* next group of ready batches.
*/
private void sendBatch(final BulkCommitBatch batch) {
boolean success = rateLimiter.tryMakeRequest(batch.getOperationCount());
Preconditions.checkState(
batch.state == BatchState.READY_TO_SEND,
"The batch should be marked as READY_TO_SEND before committing");
batch.state = BatchState.SENT;
boolean success = rateLimiter.tryMakeRequest(batch.getPendingOperationCount());
Preconditions.checkState(success, "Batch should be under rate limit to be sent.");
try {
final ApiFuture<List<BatchWriteResult>> commitFuture = batch.bulkCommit();
commitFuture.addListener(
new Runnable() {
public void run() {
try {
batch.processResults(commitFuture.get(), null);
} catch (Exception e) {
batch.processResults(new ArrayList<BatchWriteResult>(), e);
}
// Remove the batch from BatchQueue after it has been processed.
boolean removed = batchQueue.remove(batch);
Preconditions.checkState(removed, "The batch should be in the BatchQueue.");
sendReadyBatches();
}
},

ApiFuture<Void> commitFuture = bulkCommit(batch);
commitFuture.addListener(
new Runnable() {
public void run() {
boolean removed = batchQueue.remove(batch);
Preconditions.checkState(
removed, "The batch should be in the BatchQueue." + batchQueue.size());
sendReadyBatches();
}
},
MoreExecutors.directExecutor());
}

private ApiFuture<Void> bulkCommit(BulkCommitBatch batch) {
return bulkCommit(batch, 0);
}

private ApiFuture<Void> bulkCommit(final BulkCommitBatch batch, final int attempt) {
final SettableApiFuture<Void> backoffFuture = SettableApiFuture.create();

// Add a backoff delay. At first, this is 0.
firestoreExecutor.schedule(
new Runnable() {
@Override
public void run() {
backoffFuture.set(null);
}
},
nextAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);

return ApiFutures.transformAsync(
backoffFuture, new BackoffCallback(batch, attempt), firestoreExecutor);
}

private class BackoffCallback implements ApiAsyncFunction<Void, Void> {
final BulkCommitBatch batch;
final int attempt;

public BackoffCallback(BulkCommitBatch batch, int attempt) {
this.batch = batch;
this.attempt = attempt;
}

@Override
public ApiFuture<Void> apply(Void ignored) {

return ApiFutures.transformAsync(
ApiFutures.catchingAsync(
batch.bulkCommit(),
Exception.class,
new ApiAsyncFunction<Exception, List<BatchWriteResult>>() {
public ApiFuture<List<BatchWriteResult>> apply(Exception exception) {
List<BatchWriteResult> results = new ArrayList<>();
// If the BatchWrite RPC fails, map the exception to each individual result.
for (DocumentReference documentReference : batch.getPendingDocuments()) {
results.add(new BatchWriteResult(documentReference, null, exception));
}
return ApiFutures.immediateFuture(results);
}
},
MoreExecutors.directExecutor()),
new ProcessBulkCommitCallback(batch, attempt),
MoreExecutors.directExecutor());
} catch (Exception e) {
batch.processResults(new ArrayList<BatchWriteResult>(), e);
}
}

private class ProcessBulkCommitCallback
implements ApiAsyncFunction<List<BatchWriteResult>, Void> {
final BulkCommitBatch batch;
final int attempt;

public ProcessBulkCommitCallback(BulkCommitBatch batch, int attempt) {
this.batch = batch;
this.attempt = attempt;
}

@Override
public ApiFuture<Void> apply(List<BatchWriteResult> results) {
batch.processResults(results);
Set<DocumentReference> remainingOps = batch.getPendingDocuments();
if (!remainingOps.isEmpty()) {
logger.log(
Level.WARNING,
String.format(
"Current batch failed at retry #%d. Num failures: %d",
attempt, remainingOps.size()));

if (attempt < MAX_RETRY_ATTEMPTS) {
nextAttempt = backoff.createNextAttempt(nextAttempt);
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch, remainingOps);
return bulkCommit(newBatch, attempt + 1);
} else {
batch.failRemainingOperations(results);
}
}
return ApiFutures.immediateFuture(null);
}
}

Expand All @@ -601,15 +729,15 @@ private boolean isBatchSendable(BulkCommitBatch batch) {
return false;
}

for (final DocumentReference document : batch.getDocuments()) {
for (final DocumentReference documentReference : batch.getPendingDocuments()) {
boolean isRefInFlight =
FluentIterable.from(batchQueue)
.anyMatch(
new Predicate<BulkCommitBatch>() {
@Override
public boolean apply(BulkCommitBatch batch) {
return batch.getState().equals(BatchState.SENT)
&& batch.getDocuments().contains(document);
&& batch.hasDocument(documentReference);
}
});

Expand All @@ -620,7 +748,7 @@ public boolean apply(BulkCommitBatch batch) {
"Duplicate write to document %s detected. Writing to the same document multiple"
+ " times will slow down BulkWriter. Write to unique documents in order to "
+ "maximize throughput.",
document));
documentReference.getPath()));
return false;
}
}
Expand Down
Loading