From a16cb8864c0c8be26d34e71dbf261dbfc5e09bac Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 28 Oct 2019 15:38:12 -0400 Subject: [PATCH] fix: handle recovery failures during stream reframing failure (#46) * fix: handle recovery failures during stream reframing failure This was discovered while debugging another issue. While deflaking ReadRowRetryTest, this issue came up preventing me from seeing the underlying issue. ReframingResponseObserver#deliverUnsafe() should never fail. However if does, it will try to cancel the upstream stream and notify the downstream observer. However canceling the upstream can throw an exception and prevent the downstram observer from being notified of any error. This fix will catch cancellation errors and add them as suppressed exceptions to the original failure * add test * format --- .../reframing/ReframingResponseObserver.java | 9 ++- .../ReframingResponseObserverTest.java | 57 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java index 849c56b19..c46eb55ff 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java @@ -244,7 +244,14 @@ private void deliver() { // purposefully leaving the lock non-zero and notifying the outerResponseObserver of the // error. Care must be taken to avoid calling close twice in case the first invocation threw // an error. - innerController.cancel(); + try { + innerController.cancel(); + } catch (Throwable cancelError) { + t.addSuppressed( + new IllegalStateException( + "Failed to cancel upstream while recovering from an unexpected error", + cancelError)); + } if (!finished) { outerResponseObserver.onError(t); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java index d6c1287d0..426c27f5a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.gaxx.reframing; +import com.google.api.gax.rpc.StreamController; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash; import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver; @@ -41,6 +42,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; @RunWith(JUnit4.class) public class ReframingResponseObserverTest { @@ -374,6 +376,61 @@ public String pop() { Truth.assertThat(lastCall.getNumDelivered()).isEqualTo(2); } + /** + * Test the scenario where the reframer throws an exception on incoming data and the upstream + * throws an exception during cleanup when cancel is called. + */ + @Test + public void testFailedRecoveryHandling() { + MockResponseObserver outerObserver = new MockResponseObserver<>(true); + final RuntimeException fakeReframerError = new RuntimeException("fake reframer error"); + + Reframer brokenReframer = + new Reframer() { + @Override + public void push(String ignored) { + throw fakeReframerError; + } + + @Override + public boolean hasFullFrame() { + return false; + } + + @Override + public boolean hasPartialFrame() { + return false; + } + + @Override + public String pop() { + throw new IllegalStateException("should not be called"); + } + }; + ReframingResponseObserver middleware = + new ReframingResponseObserver<>(outerObserver, brokenReframer); + + // Configure the mock inner controller to fail cancellation. + StreamController mockInnerController = Mockito.mock(StreamController.class); + RuntimeException fakeCancelError = new RuntimeException("fake cancel error"); + Mockito.doThrow(fakeCancelError).when(mockInnerController).cancel(); + + // Jumpstart a call & feed it data + middleware.onStartImpl(mockInnerController); + middleware.onResponseImpl("1"); + + // Make sure that the outer observer was notified with the reframer, which contains a suppressed + // cancellation error. + Throwable finalError = outerObserver.getFinalError(); + Truth.assertThat(finalError).isSameInstanceAs(fakeReframerError); + Truth.assertThat(ImmutableList.of(finalError.getSuppressed())).hasSize(1); + Truth.assertThat(finalError.getSuppressed()[0]).isInstanceOf(IllegalStateException.class); + Truth.assertThat(finalError.getSuppressed()[0]) + .hasMessageThat() + .isEqualTo("Failed to cancel upstream while recovering from an unexpected error"); + Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError); + } + /** * A simple implementation of a {@link Reframer}. The input string is split by dash, and the * output is concatenated by dashes. The test can verify M:N behavior by adjusting the