30
30
import com .google .api .gax .grpc .GrpcCallContext ;
31
31
import com .google .api .gax .grpc .GrpcTransportChannel ;
32
32
import com .google .api .gax .rpc .FixedTransportChannelProvider ;
33
+ import com .google .api .gax .rpc .InstantiatingWatchdogProvider ;
33
34
import com .google .api .gax .rpc .ServerStreamingCallable ;
35
+ import com .google .api .gax .rpc .WatchdogTimeoutException ;
34
36
import com .google .auth .oauth2 .ServiceAccountJwtAccessCredentials ;
35
37
import com .google .bigtable .v2 .BigtableGrpc ;
36
38
import com .google .bigtable .v2 .FeatureFlags ;
37
39
import com .google .bigtable .v2 .MutateRowsRequest ;
38
40
import com .google .bigtable .v2 .MutateRowsResponse ;
39
41
import com .google .bigtable .v2 .PingAndWarmRequest ;
40
42
import com .google .bigtable .v2 .PingAndWarmResponse ;
43
+ import com .google .bigtable .v2 .ReadChangeStreamRequest ;
44
+ import com .google .bigtable .v2 .ReadChangeStreamResponse ;
41
45
import com .google .bigtable .v2 .ReadRowsRequest ;
42
46
import com .google .bigtable .v2 .ReadRowsResponse ;
43
47
import com .google .bigtable .v2 .RowSet ;
46
50
import com .google .cloud .bigtable .data .v2 .BigtableDataSettings ;
47
51
import com .google .cloud .bigtable .data .v2 .FakeServiceBuilder ;
48
52
import com .google .cloud .bigtable .data .v2 .internal .RequestContext ;
49
- import com .google .cloud .bigtable .data .v2 .models .BulkMutation ;
50
- import com .google .cloud .bigtable .data .v2 .models .DefaultRowAdapter ;
51
- import com .google .cloud .bigtable .data .v2 .models .Query ;
53
+ import com .google .cloud .bigtable .data .v2 .models .*;
52
54
import com .google .cloud .bigtable .data .v2 .models .Row ;
53
- import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
54
55
import com .google .common .collect .ImmutableMap ;
55
56
import com .google .common .collect .Queues ;
56
57
import com .google .common .io .BaseEncoding ;
82
83
import java .security .NoSuchAlgorithmException ;
83
84
import java .util .Base64 ;
84
85
import java .util .Collection ;
86
+ import java .util .Iterator ;
85
87
import java .util .concurrent .ArrayBlockingQueue ;
86
88
import java .util .concurrent .BlockingQueue ;
87
89
import java .util .concurrent .ExecutionException ;
88
90
import java .util .concurrent .TimeUnit ;
89
91
import org .junit .After ;
92
+ import org .junit .Assert ;
90
93
import org .junit .Before ;
91
94
import org .junit .Test ;
92
95
import org .junit .runner .RunWith ;
@@ -101,6 +104,8 @@ public class EnhancedBigtableStubTest {
101
104
private static final String TABLE_NAME =
102
105
NameUtil .formatTableName (PROJECT_ID , INSTANCE_ID , "fake-table" );
103
106
private static final String APP_PROFILE_ID = "app-profile-id" ;
107
+ private static final String WAIT_TIME_TABLE_ID = "test-wait-timeout" ;
108
+ private static final Duration WATCHDOG_CHECK_DURATION = Duration .ofMillis (100 );
104
109
105
110
private Server server ;
106
111
private MetadataInterceptor metadataInterceptor ;
@@ -544,6 +549,46 @@ public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception {
544
549
assertThat (featureFlags .getMutateRowsRateLimit ()).isFalse ();
545
550
}
546
551
552
+ @ Test
553
+ public void testWaitTimeoutIsSet () throws Exception {
554
+ EnhancedBigtableStubSettings .Builder settings = defaultSettings .toBuilder ();
555
+ // Set a shorter wait timeout and make watchdog checks more frequently
556
+ settings .readRowsSettings ().setWaitTimeout (WATCHDOG_CHECK_DURATION .dividedBy (2 ));
557
+ settings .setStreamWatchdogProvider (
558
+ InstantiatingWatchdogProvider .create ().withCheckInterval (WATCHDOG_CHECK_DURATION ));
559
+
560
+ EnhancedBigtableStub stub = EnhancedBigtableStub .create (settings .build ());
561
+ Iterator <Row > iterator =
562
+ stub .readRowsCallable ().call (Query .create (WAIT_TIME_TABLE_ID )).iterator ();
563
+ try {
564
+ iterator .next ();
565
+ Assert .fail ("Should throw watchdog timeout exception" );
566
+ } catch (WatchdogTimeoutException e ) {
567
+ assertThat (e .getMessage ()).contains ("Canceled due to timeout waiting for next response" );
568
+ }
569
+ }
570
+
571
+ @ Test
572
+ public void testReadChangeStreamWaitTimeoutIsSet () throws Exception {
573
+ EnhancedBigtableStubSettings .Builder settings = defaultSettings .toBuilder ();
574
+ // Set a shorter wait timeout and make watchdog checks more frequently
575
+ settings .readChangeStreamSettings ().setWaitTimeout (WATCHDOG_CHECK_DURATION .dividedBy (2 ));
576
+ settings .setStreamWatchdogProvider (
577
+ InstantiatingWatchdogProvider .create ().withCheckInterval (WATCHDOG_CHECK_DURATION ));
578
+
579
+ EnhancedBigtableStub stub = EnhancedBigtableStub .create (settings .build ());
580
+ Iterator <ChangeStreamRecord > iterator =
581
+ stub .readChangeStreamCallable ()
582
+ .call (ReadChangeStreamQuery .create (WAIT_TIME_TABLE_ID ))
583
+ .iterator ();
584
+ try {
585
+ iterator .next ();
586
+ Assert .fail ("Should throw watchdog timeout exception" );
587
+ } catch (WatchdogTimeoutException e ) {
588
+ assertThat (e .getMessage ()).contains ("Canceled due to timeout waiting for next response" );
589
+ }
590
+ }
591
+
547
592
private static class MetadataInterceptor implements ServerInterceptor {
548
593
final BlockingQueue <Metadata > headers = Queues .newLinkedBlockingDeque ();
549
594
@@ -572,6 +617,8 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
572
617
573
618
private static class FakeDataService extends BigtableGrpc .BigtableImplBase {
574
619
final BlockingQueue <ReadRowsRequest > requests = Queues .newLinkedBlockingDeque ();
620
+ final BlockingQueue <ReadChangeStreamRequest > readChangeReadStreamRequests =
621
+ Queues .newLinkedBlockingDeque ();
575
622
final BlockingQueue <PingAndWarmRequest > pingRequests = Queues .newLinkedBlockingDeque ();
576
623
577
624
@ SuppressWarnings ("unchecked" )
@@ -593,6 +640,13 @@ public void mutateRows(
593
640
@ Override
594
641
public void readRows (
595
642
ReadRowsRequest request , StreamObserver <ReadRowsResponse > responseObserver ) {
643
+ if (request .getTableName ().contains (WAIT_TIME_TABLE_ID )) {
644
+ try {
645
+ Thread .sleep (WATCHDOG_CHECK_DURATION .toMillis () * 2 );
646
+ } catch (Exception e ) {
647
+
648
+ }
649
+ }
596
650
requests .add (request );
597
651
// Dummy row for stream
598
652
responseObserver .onNext (
@@ -608,6 +662,23 @@ public void readRows(
608
662
responseObserver .onCompleted ();
609
663
}
610
664
665
+ @ Override
666
+ public void readChangeStream (
667
+ ReadChangeStreamRequest request ,
668
+ StreamObserver <ReadChangeStreamResponse > responseObserver ) {
669
+ if (request .getTableName ().contains (WAIT_TIME_TABLE_ID )) {
670
+ try {
671
+ Thread .sleep (WATCHDOG_CHECK_DURATION .toMillis () * 2 );
672
+ } catch (Exception e ) {
673
+
674
+ }
675
+ }
676
+ readChangeReadStreamRequests .add (request );
677
+ // Dummy row for stream
678
+ responseObserver .onNext (ReadChangeStreamResponse .getDefaultInstance ());
679
+ responseObserver .onCompleted ();
680
+ }
681
+
611
682
@ Override
612
683
public void pingAndWarm (
613
684
PingAndWarmRequest request , StreamObserver <PingAndWarmResponse > responseObserver ) {
0 commit comments