diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java index 849c56b19..c46eb55ff 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java @@ -244,7 +244,14 @@ private void deliver() { // purposefully leaving the lock non-zero and notifying the outerResponseObserver of the // error. Care must be taken to avoid calling close twice in case the first invocation threw // an error. - innerController.cancel(); + try { + innerController.cancel(); + } catch (Throwable cancelError) { + t.addSuppressed( + new IllegalStateException( + "Failed to cancel upstream while recovering from an unexpected error", + cancelError)); + } if (!finished) { outerResponseObserver.onError(t); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java index d6c1287d0..426c27f5a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.gaxx.reframing; +import com.google.api.gax.rpc.StreamController; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash; import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver; @@ -41,6 +42,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; @RunWith(JUnit4.class) public class ReframingResponseObserverTest { @@ -374,6 +376,61 @@ public String pop() { Truth.assertThat(lastCall.getNumDelivered()).isEqualTo(2); } + /** + * Test the scenario where the reframer throws an exception on incoming data and the upstream + * throws an exception during cleanup when cancel is called. + */ + @Test + public void testFailedRecoveryHandling() { + MockResponseObserver outerObserver = new MockResponseObserver<>(true); + final RuntimeException fakeReframerError = new RuntimeException("fake reframer error"); + + Reframer brokenReframer = + new Reframer() { + @Override + public void push(String ignored) { + throw fakeReframerError; + } + + @Override + public boolean hasFullFrame() { + return false; + } + + @Override + public boolean hasPartialFrame() { + return false; + } + + @Override + public String pop() { + throw new IllegalStateException("should not be called"); + } + }; + ReframingResponseObserver middleware = + new ReframingResponseObserver<>(outerObserver, brokenReframer); + + // Configure the mock inner controller to fail cancellation. + StreamController mockInnerController = Mockito.mock(StreamController.class); + RuntimeException fakeCancelError = new RuntimeException("fake cancel error"); + Mockito.doThrow(fakeCancelError).when(mockInnerController).cancel(); + + // Jumpstart a call & feed it data + middleware.onStartImpl(mockInnerController); + middleware.onResponseImpl("1"); + + // Make sure that the outer observer was notified with the reframer, which contains a suppressed + // cancellation error. + Throwable finalError = outerObserver.getFinalError(); + Truth.assertThat(finalError).isSameInstanceAs(fakeReframerError); + Truth.assertThat(ImmutableList.of(finalError.getSuppressed())).hasSize(1); + Truth.assertThat(finalError.getSuppressed()[0]).isInstanceOf(IllegalStateException.class); + Truth.assertThat(finalError.getSuppressed()[0]) + .hasMessageThat() + .isEqualTo("Failed to cancel upstream while recovering from an unexpected error"); + Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError); + } + /** * A simple implementation of a {@link Reframer}. The input string is split by dash, and the * output is concatenated by dashes. The test can verify M:N behavior by adjusting the