From 06dbf129ce63d28430e1022137679c9cfdf433ee Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 14 Apr 2021 11:56:16 -0400 Subject: [PATCH] fix: fix watchdog NPE red herring (#1344) * fix: fix watchdog NPE red herring * adding a test * checking a few more things in the test * make innercallable volatile --- .../java/com/google/api/gax/rpc/Watchdog.java | 8 +++++- .../com/google/api/gax/rpc/WatchdogTest.java | 25 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index a8cea4f6e..d40da8c18 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -183,7 +183,7 @@ class WatchdogStream extends StateCheckingResponseObserver private boolean autoAutoFlowControl = true; private final ResponseObserver outerResponseObserver; - private StreamController innerController; + private volatile StreamController innerController; @GuardedBy("lock") private State state = State.IDLE; @@ -296,6 +296,12 @@ public void onCompleteImpl() { * @return True if the stream was canceled. */ boolean cancelIfStale() { + // If the stream hasn't started yet, innerController will be null. Skip the check this time + // and return false so the stream is still watched. + if (innerController == null) { + return false; + } + Throwable myError = null; synchronized (lock) { diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index c41abebdb..0b161018f 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -128,6 +128,31 @@ public void testIdleTimeout() throws InterruptedException { assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); } + @Test + public void testTimedOutBeforeStart() throws InterruptedException { + MockServerStreamingCallable callable1 = new MockServerStreamingCallable<>(); + AccumulatingObserver downstreamObserver1 = new AccumulatingObserver<>(); + ResponseObserver observer = watchdog.watch(downstreamObserver1, waitTime, idleTime); + clock.incrementNanoTime(idleTime.toNanos() + 1); + // This should not remove callable1 from watched list + watchdog.run(); + assertThat(downstreamObserver1.done.isDone()).isFalse(); + + callable1.call("request", observer); + // This should cancel callable1 + watchdog.run(); + MockServerStreamingCall call1 = callable1.popLastCall(); + assertThat(call1.getController().isCancelled()).isTrue(); + call1.getController().getObserver().onError(new CancellationException("User cancelled")); + Throwable error = null; + try { + downstreamObserver1.done.get(); + } catch (ExecutionException t) { + error = t.getCause(); + } + assertThat(error).isInstanceOf(WatchdogTimeoutException.class); + } + @Test public void testMultiple() throws Exception { // Start stream1