Skip to content

Commit

Permalink
fix: handle recovery failures during stream reframing failure (#46)
Browse files Browse the repository at this point in the history
* fix: handle recovery failures during stream reframing failure

This was discovered while debugging another issue. While deflaking
ReadRowRetryTest, this issue came up preventing me from seeing the
underlying issue.

ReframingResponseObserver#deliverUnsafe() should never fail. However if
does, it will try to cancel the upstream stream and notify the
downstream observer. However canceling the upstream can throw an
exception and prevent the downstram observer from being notified of any
error.

This fix will catch cancellation errors and add them as suppressed
exceptions to the original failure

* add test

* format
  • Loading branch information
igorbernstein2 committed Oct 28, 2019
1 parent 4ca7e2f commit a16cb88
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
Expand Up @@ -244,7 +244,14 @@ private void deliver() {
// purposefully leaving the lock non-zero and notifying the outerResponseObserver of the
// error. Care must be taken to avoid calling close twice in case the first invocation threw
// an error.
innerController.cancel();
try {
innerController.cancel();
} catch (Throwable cancelError) {
t.addSuppressed(
new IllegalStateException(
"Failed to cancel upstream while recovering from an unexpected error",
cancelError));
}
if (!finished) {
outerResponseObserver.onError(t);
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.gaxx.reframing;

import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver;
Expand All @@ -41,6 +42,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class ReframingResponseObserverTest {
Expand Down Expand Up @@ -374,6 +376,61 @@ public String pop() {
Truth.assertThat(lastCall.getNumDelivered()).isEqualTo(2);
}

/**
* Test the scenario where the reframer throws an exception on incoming data and the upstream
* throws an exception during cleanup when cancel is called.
*/
@Test
public void testFailedRecoveryHandling() {
MockResponseObserver<String> outerObserver = new MockResponseObserver<>(true);
final RuntimeException fakeReframerError = new RuntimeException("fake reframer error");

Reframer<String, String> brokenReframer =
new Reframer<String, String>() {
@Override
public void push(String ignored) {
throw fakeReframerError;
}

@Override
public boolean hasFullFrame() {
return false;
}

@Override
public boolean hasPartialFrame() {
return false;
}

@Override
public String pop() {
throw new IllegalStateException("should not be called");
}
};
ReframingResponseObserver<String, String> middleware =
new ReframingResponseObserver<>(outerObserver, brokenReframer);

// Configure the mock inner controller to fail cancellation.
StreamController mockInnerController = Mockito.mock(StreamController.class);
RuntimeException fakeCancelError = new RuntimeException("fake cancel error");
Mockito.doThrow(fakeCancelError).when(mockInnerController).cancel();

// Jumpstart a call & feed it data
middleware.onStartImpl(mockInnerController);
middleware.onResponseImpl("1");

// Make sure that the outer observer was notified with the reframer, which contains a suppressed
// cancellation error.
Throwable finalError = outerObserver.getFinalError();
Truth.assertThat(finalError).isSameInstanceAs(fakeReframerError);
Truth.assertThat(ImmutableList.of(finalError.getSuppressed())).hasSize(1);
Truth.assertThat(finalError.getSuppressed()[0]).isInstanceOf(IllegalStateException.class);
Truth.assertThat(finalError.getSuppressed()[0])
.hasMessageThat()
.isEqualTo("Failed to cancel upstream while recovering from an unexpected error");
Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError);
}

/**
* A simple implementation of a {@link Reframer}. The input string is split by dash, and the
* output is concatenated by dashes. The test can verify M:N behavior by adjusting the
Expand Down

0 comments on commit a16cb88

Please sign in to comment.