3030import com .google .api .gax .grpc .GrpcCallContext ;
3131import com .google .api .gax .grpc .GrpcTransportChannel ;
3232import com .google .api .gax .rpc .FixedTransportChannelProvider ;
33+ import com .google .api .gax .rpc .InstantiatingWatchdogProvider ;
3334import com .google .api .gax .rpc .ServerStreamingCallable ;
35+ import com .google .api .gax .rpc .WatchdogTimeoutException ;
3436import com .google .auth .oauth2 .ServiceAccountJwtAccessCredentials ;
3537import com .google .bigtable .v2 .BigtableGrpc ;
3638import com .google .bigtable .v2 .FeatureFlags ;
3739import com .google .bigtable .v2 .MutateRowsRequest ;
3840import com .google .bigtable .v2 .MutateRowsResponse ;
3941import com .google .bigtable .v2 .PingAndWarmRequest ;
4042import com .google .bigtable .v2 .PingAndWarmResponse ;
43+ import com .google .bigtable .v2 .ReadChangeStreamRequest ;
44+ import com .google .bigtable .v2 .ReadChangeStreamResponse ;
4145import com .google .bigtable .v2 .ReadRowsRequest ;
4246import com .google .bigtable .v2 .ReadRowsResponse ;
4347import com .google .bigtable .v2 .RowSet ;
4650import com .google .cloud .bigtable .data .v2 .BigtableDataSettings ;
4751import com .google .cloud .bigtable .data .v2 .FakeServiceBuilder ;
4852import com .google .cloud .bigtable .data .v2 .internal .RequestContext ;
49- import com .google .cloud .bigtable .data .v2 .models .BulkMutation ;
50- import com .google .cloud .bigtable .data .v2 .models .DefaultRowAdapter ;
51- import com .google .cloud .bigtable .data .v2 .models .Query ;
53+ import com .google .cloud .bigtable .data .v2 .models .*;
5254import com .google .cloud .bigtable .data .v2 .models .Row ;
53- import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
5455import com .google .common .collect .ImmutableMap ;
5556import com .google .common .collect .Queues ;
5657import com .google .common .io .BaseEncoding ;
8283import java .security .NoSuchAlgorithmException ;
8384import java .util .Base64 ;
8485import java .util .Collection ;
86+ import java .util .Iterator ;
8587import java .util .concurrent .ArrayBlockingQueue ;
8688import java .util .concurrent .BlockingQueue ;
8789import java .util .concurrent .ExecutionException ;
8890import java .util .concurrent .TimeUnit ;
8991import org .junit .After ;
92+ import org .junit .Assert ;
9093import org .junit .Before ;
9194import org .junit .Test ;
9295import org .junit .runner .RunWith ;
@@ -101,6 +104,8 @@ public class EnhancedBigtableStubTest {
101104 private static final String TABLE_NAME =
102105 NameUtil .formatTableName (PROJECT_ID , INSTANCE_ID , "fake-table" );
103106 private static final String APP_PROFILE_ID = "app-profile-id" ;
107+ private static final String WAIT_TIME_TABLE_ID = "test-wait-timeout" ;
108+ private static final Duration WATCHDOG_CHECK_DURATION = Duration .ofMillis (100 );
104109
105110 private Server server ;
106111 private MetadataInterceptor metadataInterceptor ;
@@ -544,6 +549,46 @@ public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception {
544549 assertThat (featureFlags .getMutateRowsRateLimit ()).isFalse ();
545550 }
546551
552+ @ Test
553+ public void testWaitTimeoutIsSet () throws Exception {
554+ EnhancedBigtableStubSettings .Builder settings = defaultSettings .toBuilder ();
555+ // Set a shorter wait timeout and make watchdog checks more frequently
556+ settings .readRowsSettings ().setWaitTimeout (WATCHDOG_CHECK_DURATION .dividedBy (2 ));
557+ settings .setStreamWatchdogProvider (
558+ InstantiatingWatchdogProvider .create ().withCheckInterval (WATCHDOG_CHECK_DURATION ));
559+
560+ EnhancedBigtableStub stub = EnhancedBigtableStub .create (settings .build ());
561+ Iterator <Row > iterator =
562+ stub .readRowsCallable ().call (Query .create (WAIT_TIME_TABLE_ID )).iterator ();
563+ try {
564+ iterator .next ();
565+ Assert .fail ("Should throw watchdog timeout exception" );
566+ } catch (WatchdogTimeoutException e ) {
567+ assertThat (e .getMessage ()).contains ("Canceled due to timeout waiting for next response" );
568+ }
569+ }
570+
571+ @ Test
572+ public void testReadChangeStreamWaitTimeoutIsSet () throws Exception {
573+ EnhancedBigtableStubSettings .Builder settings = defaultSettings .toBuilder ();
574+ // Set a shorter wait timeout and make watchdog checks more frequently
575+ settings .readChangeStreamSettings ().setWaitTimeout (WATCHDOG_CHECK_DURATION .dividedBy (2 ));
576+ settings .setStreamWatchdogProvider (
577+ InstantiatingWatchdogProvider .create ().withCheckInterval (WATCHDOG_CHECK_DURATION ));
578+
579+ EnhancedBigtableStub stub = EnhancedBigtableStub .create (settings .build ());
580+ Iterator <ChangeStreamRecord > iterator =
581+ stub .readChangeStreamCallable ()
582+ .call (ReadChangeStreamQuery .create (WAIT_TIME_TABLE_ID ))
583+ .iterator ();
584+ try {
585+ iterator .next ();
586+ Assert .fail ("Should throw watchdog timeout exception" );
587+ } catch (WatchdogTimeoutException e ) {
588+ assertThat (e .getMessage ()).contains ("Canceled due to timeout waiting for next response" );
589+ }
590+ }
591+
547592 private static class MetadataInterceptor implements ServerInterceptor {
548593 final BlockingQueue <Metadata > headers = Queues .newLinkedBlockingDeque ();
549594
@@ -572,6 +617,8 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
572617
573618 private static class FakeDataService extends BigtableGrpc .BigtableImplBase {
574619 final BlockingQueue <ReadRowsRequest > requests = Queues .newLinkedBlockingDeque ();
620+ final BlockingQueue <ReadChangeStreamRequest > readChangeReadStreamRequests =
621+ Queues .newLinkedBlockingDeque ();
575622 final BlockingQueue <PingAndWarmRequest > pingRequests = Queues .newLinkedBlockingDeque ();
576623
577624 @ SuppressWarnings ("unchecked" )
@@ -593,6 +640,13 @@ public void mutateRows(
593640 @ Override
594641 public void readRows (
595642 ReadRowsRequest request , StreamObserver <ReadRowsResponse > responseObserver ) {
643+ if (request .getTableName ().contains (WAIT_TIME_TABLE_ID )) {
644+ try {
645+ Thread .sleep (WATCHDOG_CHECK_DURATION .toMillis () * 2 );
646+ } catch (Exception e ) {
647+
648+ }
649+ }
596650 requests .add (request );
597651 // Dummy row for stream
598652 responseObserver .onNext (
@@ -608,6 +662,23 @@ public void readRows(
608662 responseObserver .onCompleted ();
609663 }
610664
665+ @ Override
666+ public void readChangeStream (
667+ ReadChangeStreamRequest request ,
668+ StreamObserver <ReadChangeStreamResponse > responseObserver ) {
669+ if (request .getTableName ().contains (WAIT_TIME_TABLE_ID )) {
670+ try {
671+ Thread .sleep (WATCHDOG_CHECK_DURATION .toMillis () * 2 );
672+ } catch (Exception e ) {
673+
674+ }
675+ }
676+ readChangeReadStreamRequests .add (request );
677+ // Dummy row for stream
678+ responseObserver .onNext (ReadChangeStreamResponse .getDefaultInstance ());
679+ responseObserver .onCompleted ();
680+ }
681+
611682 @ Override
612683 public void pingAndWarm (
613684 PingAndWarmRequest request , StreamObserver <PingAndWarmResponse > responseObserver ) {
0 commit comments