From eff90819a7abee6fbd9efd050add9fe1bf8d9d02 Mon Sep 17 00:00:00 2001 From: George Gensure Date: Wed, 20 Nov 2019 19:33:09 -0500 Subject: [PATCH 1/3] Only request grpc write when not complete If a queryWriteStatus yields a committedSize which leaves no content remaining to be uploaded, immediately succeed a blob upload. This can easily occur if a competing blob write completes asynchronously between abnormal write termination and a query. --- .../devtools/build/lib/remote/ByteStreamUploader.java | 7 ++++++- .../java/com/google/devtools/build/lib/remote/Chunker.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 91f9f509f600ed..90c9e45c2a36ff 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -353,7 +353,12 @@ ListenableFuture start() { AtomicLong committedOffset = new AtomicLong(0); return Futures.transformAsync( retrier.executeAsync( - () -> ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)), + () -> { + if (committedOffset.get() < chunker.getSize()) { + return ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)); + } + return Futures.immediateFuture(null); + }, progressiveBackoff), (result) -> { long committedSize = committedOffset.get(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index cda7a3aa7819e6..8904e59e20bd37 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -218,7 +218,7 @@ public Chunk next() throws IOException { return new Chunk(blob, offsetBefore); } - private long bytesLeft() { + public long bytesLeft() { return getSize() - getOffset(); } From 4f25c8361403a55545fded354226e52d6c467488 Mon Sep 17 00:00:00 2001 From: George Gensure Date: Thu, 21 Nov 2019 09:26:37 -0500 Subject: [PATCH 2/3] Regression test for async completion of write after deadline --- .../lib/remote/ByteStreamUploaderTest.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index cd28f561de083c..abd07b1ba228ac 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -333,6 +333,66 @@ public void queryWriteStatus( withEmptyMetadata.detach(prevContext); } + @Test + public void asyncCompletedUploadDoesNotReWrite() throws Exception { + Context prevContext = withEmptyMetadata.attach(); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + INSTANCE_NAME, new ReferenceCountedChannel(channel), null, 1, retrier); + + byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; + new Random().nextBytes(blob); + + Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); + HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); + + AtomicInteger numWriteCalls = new AtomicInteger(0); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + numWriteCalls.getAndIncrement(); + streamObserver.onError(Status.DEADLINE_EXCEEDED.asException()); + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) { + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver response) { + response.onNext( + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(blob.length) + .setComplete(true) + .build()); + response.onCompleted(); + } + }); + + uploader.uploadBlob(hash, chunker, true); + + // This test should not have triggered any retries. + assertThat(numWriteCalls.get()).isEqualTo(1); + + blockUntilInternalStateConsistent(uploader); + + withEmptyMetadata.detach(prevContext); + } + @Test public void unimplementedQueryShouldRestartUpload() throws Exception { Context prevContext = withEmptyMetadata.attach(); From 6ca3278d41963a37bacb857f6fdb1fa75d1fbcda Mon Sep 17 00:00:00 2001 From: Jakob Buchgraber Date: Thu, 21 Nov 2019 15:35:40 +0100 Subject: [PATCH 3/3] add a comment that clarifies what the test does --- .../devtools/build/lib/remote/ByteStreamUploaderTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index abd07b1ba228ac..c26f4182b0be24 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -334,7 +334,9 @@ public void queryWriteStatus( } @Test - public void asyncCompletedUploadDoesNotReWrite() throws Exception { + public void concurrentlyCompletedUploadIsNotRetried() throws Exception { + // Test that after an upload has failed and the QueryWriteStatus call returns + // that the upload has completed that we'll not retry the upload. Context prevContext = withEmptyMetadata.attach(); RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);