Skip to content

Commit f6d0c66

Browse files
feat: call setting timeouts for batchers
This introduces 2 new variants of new*Batcher that accept a GrpcCallContext. This context will be used for batch RPCs generated by the batcher instance. Also fixes handlings of timeout overrides for bukmutations. If a user set a timeout, don't override it
1 parent 4614912 commit f6d0c66

File tree

8 files changed

+250
-25
lines changed

8 files changed

+250
-25
lines changed

Diff for: google-cloud-bigtable/clirr-ignored-differences.xml

+6
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,10 @@
2828
<differenceType>8001</differenceType>
2929
<className>com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable</className>
3030
</difference>
31+
<difference>
32+
<!-- change method args is ok because EnhancedBigtableStub is InternalApi -->
33+
<differenceType>7004</differenceType>
34+
<className>com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub</className>
35+
<method>*</method>
36+
</difference>
3137
</differences>

Diff for: google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java

+87-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.api.core.BetaApi;
2424
import com.google.api.core.InternalApi;
2525
import com.google.api.gax.batching.Batcher;
26+
import com.google.api.gax.grpc.GrpcCallContext;
2627
import com.google.api.gax.rpc.ApiExceptions;
2728
import com.google.api.gax.rpc.ResponseObserver;
2829
import com.google.api.gax.rpc.ServerStream;
@@ -1073,7 +1074,40 @@ public void bulkMutateRows(BulkMutation mutation) {
10731074
*/
10741075
@BetaApi("This surface is likely to change as the batching surface evolves.")
10751076
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String tableId) {
1076-
return stub.newMutateRowsBatcher(tableId);
1077+
return newBulkMutationBatcher(tableId, null);
1078+
}
1079+
1080+
/**
1081+
* Mutates multiple rows in a batch. Each individual row is mutated atomically as in MutateRow,
1082+
* but the entire batch is not executed atomically. The returned Batcher instance is not
1083+
* threadsafe, it can only be used from single thread. This method allows customization of the
1084+
* underlying RPCs by passing in a {@link com.google.api.gax.grpc.GrpcCallContext}. The same
1085+
* context will be reused for all batches. This can be used to customize things like per attempt
1086+
* timeouts.
1087+
*
1088+
* <p>Sample Code:
1089+
*
1090+
* <pre>{@code
1091+
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
1092+
* try (Batcher<RowMutationEntry, Void> batcher = bigtableDataClient.newBulkMutationBatcher("[TABLE]", GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
1093+
* for (String someValue : someCollection) {
1094+
* ApiFuture<Void> entryFuture =
1095+
* batcher.add(
1096+
* RowMutationEntry.create("[ROW KEY]")
1097+
* .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
1098+
* }
1099+
*
1100+
* // Blocks until mutations are applied on all submitted row entries.
1101+
* batcher.flush();
1102+
* }
1103+
* // Before `batcher` is closed, all remaining(If any) mutations are applied.
1104+
* }
1105+
* }</pre>
1106+
*/
1107+
@BetaApi("This surface is likely to change as the batching surface evolves.")
1108+
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(
1109+
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
1110+
return stub.newMutateRowsBatcher(tableId, ctx);
10771111
}
10781112

10791113
/**
@@ -1159,11 +1193,61 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(String tableId) {
11591193
*/
11601194
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
11611195
String tableId, @Nullable Filters.Filter filter) {
1196+
return newBulkReadRowsBatcher(tableId, filter, null);
1197+
}
1198+
1199+
/**
1200+
* Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
1201+
* value will be null. The returned Batcher instance is not threadsafe, it can only be used from a
1202+
* single thread. This method allows customization of the underlying RPCs by passing in a {@link
1203+
* com.google.api.gax.grpc.GrpcCallContext}. The same context will be reused for all batches. This
1204+
* can be used to customize things like per attempt timeouts.
1205+
*
1206+
* <p>Performance notice: The ReadRows protocol requires that rows are sent in ascending key
1207+
* order, which means that the keys are processed sequentially on the server-side, so batching
1208+
* allows improving throughput but not latency. Lower latencies can be achieved by sending smaller
1209+
* requests concurrently.
1210+
*
1211+
* <p>Sample Code:
1212+
*
1213+
* <pre>{@code
1214+
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
1215+
*
1216+
* // Build the filter expression
1217+
* Filter filter = FILTERS.chain()
1218+
* .filter(FILTERS.key().regex("prefix.*"))
1219+
* .filter(FILTERS.limit().cellsPerRow(10));
1220+
*
1221+
* List<ApiFuture<Row>> rows = new ArrayList<>();
1222+
*
1223+
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher(
1224+
* "[TABLE]", filter, GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
1225+
* for (String someValue : someCollection) {
1226+
* ApiFuture<Row> rowFuture =
1227+
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
1228+
* rows.add(rowFuture);
1229+
* }
1230+
*
1231+
* // [Optional] Sends collected elements for batching asynchronously.
1232+
* batcher.sendOutstanding();
1233+
*
1234+
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
1235+
* batcher.flush();
1236+
* }
1237+
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
1238+
* pending batches until its resolved.
1239+
*
1240+
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
1241+
* }
1242+
* }</pre>
1243+
*/
1244+
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
1245+
String tableId, @Nullable Filters.Filter filter, @Nullable GrpcCallContext ctx) {
11621246
Query query = Query.create(tableId);
11631247
if (filter != null) {
1164-
query = query.filter(filter);
1248+
query.filter(filter);
11651249
}
1166-
return stub.newBulkReadRowsBatcher(query);
1250+
return stub.newBulkReadRowsBatcher(query, ctx);
11671251
}
11681252

11691253
/**

Diff for: google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.api.gax.core.BackgroundResource;
2424
import com.google.api.gax.core.FixedCredentialsProvider;
2525
import com.google.api.gax.grpc.GaxGrpcProperties;
26+
import com.google.api.gax.grpc.GrpcCallContext;
2627
import com.google.api.gax.grpc.GrpcCallSettings;
2728
import com.google.api.gax.grpc.GrpcRawCallableFactory;
2829
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
@@ -97,6 +98,7 @@
9798
import java.util.Map;
9899
import java.util.concurrent.TimeUnit;
99100
import javax.annotation.Nonnull;
101+
import javax.annotation.Nullable;
100102

101103
/**
102104
* The core client that converts method calls to RPCs.
@@ -531,10 +533,15 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
531533
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
532534
* </ul>
533535
*/
534-
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tableId) {
536+
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
537+
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
538+
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
539+
if (ctx != null) {
540+
callable = callable.withDefaultCallContext(ctx);
541+
}
535542
return new BatcherImpl<>(
536543
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
537-
bulkMutateRowsCallable,
544+
callable,
538545
BulkMutation.create(tableId),
539546
settings.bulkMutateRowsSettings().getBatchingSettings(),
540547
clientContext.getExecutor(),
@@ -556,11 +563,16 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tabl
556563
* <li>Split the responses using {@link ReadRowsBatchingDescriptor}.
557564
* </ul>
558565
*/
559-
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
566+
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
567+
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
560568
Preconditions.checkNotNull(query, "query cannot be null");
569+
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
570+
if (ctx != null) {
571+
callable = callable.withDefaultCallContext(ctx);
572+
}
561573
return new BatcherImpl<>(
562574
settings.bulkReadRowsSettings().getBatchingDescriptor(),
563-
readRowsCallable().all(),
575+
callable,
564576
query,
565577
settings.bulkReadRowsSettings().getBatchingSettings(),
566578
clientContext.getExecutor());

Diff for: google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ public Void call() {
176176

177177
// Configure the deadline
178178
ApiCallContext currentCallContext = callContext;
179-
if (!externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
179+
if (currentCallContext.getTimeout() == null
180+
&& !externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
180181
currentCallContext =
181182
currentCallContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout());
182183
}

Diff for: google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@
3535
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
3636
import com.google.cloud.bigtable.data.v2.models.RowMutation;
3737
import com.google.common.base.Preconditions;
38+
import com.google.common.collect.ImmutableList;
3839
import com.google.protobuf.ByteString;
3940
import io.grpc.Attributes;
41+
import io.grpc.BindableService;
42+
import io.grpc.ServerInterceptor;
4043
import io.grpc.ServerTransportFilter;
4144
import io.grpc.stub.StreamObserver;
4245
import java.io.IOException;
@@ -95,7 +98,11 @@ public void transportTerminated(Attributes transportAttrs) {
9598
terminateAttributes.add(transportAttrs);
9699
}
97100
};
98-
serviceHelper = new FakeServiceHelper(null, transportFilter, service);
101+
serviceHelper =
102+
new FakeServiceHelper(
103+
ImmutableList.<ServerInterceptor>of(),
104+
transportFilter,
105+
ImmutableList.<BindableService>of(service));
99106
port = serviceHelper.getPort();
100107
serviceHelper.start();
101108

Diff for: google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
2424
import com.google.api.gax.batching.Batcher;
25+
import com.google.api.gax.grpc.GrpcCallContext;
2526
import com.google.api.gax.rpc.ResponseObserver;
2627
import com.google.api.gax.rpc.ServerStreamingCallable;
2728
import com.google.api.gax.rpc.UnaryCallable;
@@ -80,9 +81,13 @@ public void setUp() {
8081
Mockito.when(mockStub.bulkMutateRowsCallable()).thenReturn(mockBulkMutateRowsCallable);
8182
Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable);
8283
Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable);
83-
Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class)))
84+
Mockito.when(
85+
mockStub.newMutateRowsBatcher(
86+
Mockito.any(String.class), Mockito.any(GrpcCallContext.class)))
8487
.thenReturn(mockBulkMutationBatcher);
85-
Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class)))
88+
Mockito.when(
89+
mockStub.newBulkReadRowsBatcher(
90+
Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)))
8691
.thenReturn(mockBulkReadRowsBatcher);
8792
}
8893

@@ -374,7 +379,8 @@ public void proxyNewBulkMutationBatcherTest() {
374379
ApiFuture<Void> actualRes = batcher.add(request);
375380
assertThat(actualRes).isSameInstanceAs(expectedResponse);
376381

377-
Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class));
382+
Mockito.verify(mockStub)
383+
.newMutateRowsBatcher(Mockito.any(String.class), Mockito.any(GrpcCallContext.class));
378384
}
379385

380386
@Test
@@ -390,7 +396,8 @@ public void proxyNewBulkReadRowsTest() {
390396
ApiFuture<Row> actualResponse = batcher.add(request);
391397
assertThat(actualResponse).isSameInstanceAs(expectedResponse);
392398

393-
Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
399+
Mockito.verify(mockStub)
400+
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
394401
}
395402

396403
@Test
@@ -407,7 +414,8 @@ public void proxyNewBulkReadRowsWithFilterTest() {
407414
ApiFuture<Row> actualResponse = batcher.add(request);
408415
assertThat(actualResponse).isSameInstanceAs(expectedResponse);
409416

410-
Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
417+
Mockito.verify(mockStub)
418+
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
411419
}
412420

413421
@Test

Diff for: google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,43 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2;
1717

18+
import com.google.common.collect.ImmutableList;
1819
import io.grpc.BindableService;
1920
import io.grpc.Server;
2021
import io.grpc.ServerBuilder;
2122
import io.grpc.ServerInterceptor;
2223
import io.grpc.ServerTransportFilter;
2324
import java.io.IOException;
2425
import java.net.ServerSocket;
26+
import java.util.List;
2527

2628
/** Utility class to setup a fake grpc server on a random port. */
2729
public class FakeServiceHelper {
2830
private final int port;
2931
private final Server server;
3032

3133
public FakeServiceHelper(BindableService... services) throws IOException {
32-
this(null, services);
34+
this(ImmutableList.<ServerInterceptor>of(), null, ImmutableList.copyOf(services));
3335
}
3436

3537
public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services)
3638
throws IOException {
37-
this(interceptor, null, services);
39+
this(ImmutableList.of(interceptor), null, ImmutableList.copyOf(services));
3840
}
3941

4042
public FakeServiceHelper(
41-
ServerInterceptor interceptor,
43+
List<ServerInterceptor> interceptors,
4244
ServerTransportFilter transportFilter,
43-
BindableService... services)
45+
List<BindableService> services)
4446
throws IOException {
4547
try (ServerSocket ss = new ServerSocket(0)) {
4648
port = ss.getLocalPort();
4749
}
4850
ServerBuilder builder = ServerBuilder.forPort(port);
49-
if (interceptor != null) {
51+
for (ServerInterceptor interceptor : interceptors) {
5052
builder = builder.intercept(interceptor);
5153
}
54+
5255
if (transportFilter != null) {
5356
builder = builder.addTransportFilter(transportFilter);
5457
}

0 commit comments

Comments
 (0)