Skip to content

Commit b09a21c

Browse files
authored
fix: Retry "received rst stream" (#586)
* fix: Retry "received rst stream" * Check if exception is InternalException instead of using fromThrowable
1 parent 3d82093 commit b09a21c

File tree

3 files changed

+132
-1
lines changed

3 files changed

+132
-1
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
7676
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
7777
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
78+
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable;
7879
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
7980
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
8081
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
@@ -345,8 +346,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
345346
.build(),
346347
readRowsSettings.getRetryableCodes());
347348

349+
// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
350+
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
351+
// which by default is not retryable. Convert the exception so it can be retried in the client.
352+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
353+
new ReadRowsConvertExceptionCallable<>(base);
354+
348355
ServerStreamingCallable<ReadRowsRequest, RowT> merging =
349-
new RowMergingCallable<>(base, rowAdapter);
356+
new RowMergingCallable<>(convertException, rowAdapter);
350357

351358
// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner
352359
// ReadRowsRequest -> ReadRowsResponse callable).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.ApiCallContext;
20+
import com.google.api.gax.rpc.ApiException;
21+
import com.google.api.gax.rpc.InternalException;
22+
import com.google.api.gax.rpc.ResponseObserver;
23+
import com.google.api.gax.rpc.ServerStreamingCallable;
24+
import com.google.api.gax.rpc.StreamController;
25+
26+
/**
27+
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
28+
*/
29+
@InternalApi
30+
public final class ReadRowsConvertExceptionCallable<ReadRowsRequest, RowT>
31+
extends ServerStreamingCallable<ReadRowsRequest, RowT> {
32+
33+
private final ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable;
34+
35+
public ReadRowsConvertExceptionCallable(
36+
ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable) {
37+
this.innerCallable = innerCallable;
38+
}
39+
40+
@Override
41+
public void call(
42+
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
43+
ReadRowsConvertExceptionResponseObserver<RowT> observer =
44+
new ReadRowsConvertExceptionResponseObserver<>(responseObserver);
45+
innerCallable.call(request, observer, context);
46+
}
47+
48+
private class ReadRowsConvertExceptionResponseObserver<RowT> implements ResponseObserver<RowT> {
49+
50+
private final ResponseObserver<RowT> outerObserver;
51+
52+
ReadRowsConvertExceptionResponseObserver(ResponseObserver<RowT> outerObserver) {
53+
this.outerObserver = outerObserver;
54+
}
55+
56+
@Override
57+
public void onStart(StreamController controller) {
58+
outerObserver.onStart(controller);
59+
}
60+
61+
@Override
62+
public void onResponse(RowT response) {
63+
outerObserver.onResponse(response);
64+
}
65+
66+
@Override
67+
public void onError(Throwable t) {
68+
outerObserver.onError(convertException(t));
69+
}
70+
71+
@Override
72+
public void onComplete() {
73+
outerObserver.onComplete();
74+
}
75+
}
76+
77+
private Throwable convertException(Throwable t) {
78+
// Long lived connections sometimes are disconnected via an RST frame. This error is
79+
// transient and should be retried.
80+
if (t instanceof InternalException) {
81+
if (t.getMessage() != null && t.getMessage().contains("Received Rst stream")) {
82+
return new InternalException(t, ((InternalException) t).getStatusCode(), true);
83+
}
84+
}
85+
return t;
86+
}
87+
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
package com.google.cloud.bigtable.data.v2.stub.readrows;
1717

1818
import com.google.api.gax.core.NoCredentialsProvider;
19+
import com.google.api.gax.grpc.GrpcStatusCode;
1920
import com.google.api.gax.grpc.GrpcTransportChannel;
21+
import com.google.api.gax.rpc.ApiException;
2022
import com.google.api.gax.rpc.FixedTransportChannelProvider;
23+
import com.google.api.gax.rpc.InternalException;
2124
import com.google.api.gax.rpc.ServerStream;
2225
import com.google.bigtable.v2.BigtableGrpc;
2326
import com.google.bigtable.v2.ReadRowsRequest;
@@ -39,6 +42,7 @@
3942
import com.google.protobuf.StringValue;
4043
import io.grpc.Status;
4144
import io.grpc.Status.Code;
45+
import io.grpc.StatusRuntimeException;
4246
import io.grpc.stub.StreamObserver;
4347
import io.grpc.testing.GrpcServerRule;
4448
import java.io.IOException;
@@ -260,6 +264,30 @@ public void retryWithLastScannedKeyTest() {
260264
Truth.assertThat(actualResults).containsExactly("r7").inOrder();
261265
}
262266

267+
@Test
268+
public void retryRstStreamExceptionTest() {
269+
ApiException exception =
270+
new InternalException(
271+
new StatusRuntimeException(
272+
Status.INTERNAL.withDescription(
273+
"HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream")),
274+
GrpcStatusCode.of(Code.INTERNAL),
275+
false);
276+
service.expectations.add(
277+
RpcExpectation.create()
278+
.expectRequest("k1")
279+
.expectRequest(Range.closedOpen("r1", "r3"))
280+
.respondWithException(Code.INTERNAL, exception));
281+
service.expectations.add(
282+
RpcExpectation.create()
283+
.expectRequest("k1")
284+
.expectRequest(Range.closedOpen("r1", "r3"))
285+
.respondWith("k1", "r1", "r2"));
286+
287+
List<String> actualResults = getResults(Query.create(TABLE_ID).rowKey("k1").range("r1", "r3"));
288+
Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder();
289+
}
290+
263291
private List<String> getResults(Query query) {
264292
ServerStream<Row> actualRows = client.readRows(query);
265293
List<String> actualValues = Lists.newArrayList();
@@ -292,6 +320,8 @@ public void readRows(
292320
}
293321
if (expectedRpc.statusCode.toStatus().isOk()) {
294322
responseObserver.onCompleted();
323+
} else if (expectedRpc.exception != null) {
324+
responseObserver.onError(expectedRpc.exception);
295325
} else {
296326
responseObserver.onError(expectedRpc.statusCode.toStatus().asRuntimeException());
297327
}
@@ -301,6 +331,7 @@ public void readRows(
301331
private static class RpcExpectation {
302332
ReadRowsRequest.Builder requestBuilder;
303333
Status.Code statusCode;
334+
ApiException exception;
304335
List<ReadRowsResponse> responses;
305336

306337
private RpcExpectation() {
@@ -370,6 +401,12 @@ RpcExpectation respondWithStatus(Status.Code code) {
370401
return this;
371402
}
372403

404+
RpcExpectation respondWithException(Status.Code code, ApiException exception) {
405+
this.statusCode = code;
406+
this.exception = exception;
407+
return this;
408+
}
409+
373410
RpcExpectation respondWith(String... responses) {
374411
for (String response : responses) {
375412
this.responses.add(

0 commit comments

Comments
 (0)