diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index a8cea4f6e..d40da8c18 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -183,7 +183,7 @@ class WatchdogStream extends StateCheckingResponseObserver private boolean autoAutoFlowControl = true; private final ResponseObserver outerResponseObserver; - private StreamController innerController; + private volatile StreamController innerController; @GuardedBy("lock") private State state = State.IDLE; @@ -296,6 +296,12 @@ public void onCompleteImpl() { * @return True if the stream was canceled. */ boolean cancelIfStale() { + // If the stream hasn't started yet, innerController will be null. Skip the check this time + // and return false so the stream is still watched. + if (innerController == null) { + return false; + } + Throwable myError = null; synchronized (lock) { diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index c41abebdb..0b161018f 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -128,6 +128,31 @@ public void testIdleTimeout() throws InterruptedException { assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); } + @Test + public void testTimedOutBeforeStart() throws InterruptedException { + MockServerStreamingCallable callable1 = new MockServerStreamingCallable<>(); + AccumulatingObserver downstreamObserver1 = new AccumulatingObserver<>(); + ResponseObserver observer = watchdog.watch(downstreamObserver1, waitTime, idleTime); + clock.incrementNanoTime(idleTime.toNanos() + 1); + // This should not remove callable1 from watched list + watchdog.run(); + assertThat(downstreamObserver1.done.isDone()).isFalse(); + + callable1.call("request", observer); + // This should cancel callable1 + watchdog.run(); + MockServerStreamingCall call1 = callable1.popLastCall(); + assertThat(call1.getController().isCancelled()).isTrue(); + call1.getController().getObserver().onError(new CancellationException("User cancelled")); + Throwable error = null; + try { + downstreamObserver1.done.get(); + } catch (ExecutionException t) { + error = t.getCause(); + } + assertThat(error).isInstanceOf(WatchdogTimeoutException.class); + } + @Test public void testMultiple() throws Exception { // Start stream1