Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle recovery failures during stream reframing failure #46

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -244,7 +244,14 @@ private void deliver() {
// purposefully leaving the lock non-zero and notifying the outerResponseObserver of the
// error. Care must be taken to avoid calling close twice in case the first invocation threw
// an error.
innerController.cancel();
try {
innerController.cancel();
} catch (Throwable cancelError) {
t.addSuppressed(
new IllegalStateException(
"Failed to cancel upstream while recovering from an unexpected error",
cancelError));
}
if (!finished) {
outerResponseObserver.onError(t);
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.gaxx.reframing;

import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver;
Expand All @@ -41,6 +42,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class ReframingResponseObserverTest {
Expand Down Expand Up @@ -374,6 +376,61 @@ public String pop() {
Truth.assertThat(lastCall.getNumDelivered()).isEqualTo(2);
}

/**
* Test the scenario where the reframer throws an exception on incoming data and the upstream
* throws an exception during cleanup when cancel is called.
*/
@Test
public void testFailedRecoveryHandling() {
MockResponseObserver<String> outerObserver = new MockResponseObserver<>(true);
final RuntimeException fakeReframerError = new RuntimeException("fake reframer error");

Reframer<String, String> brokenReframer =
new Reframer<String, String>() {
@Override
public void push(String ignored) {
throw fakeReframerError;
}

@Override
public boolean hasFullFrame() {
return false;
}

@Override
public boolean hasPartialFrame() {
return false;
}

@Override
public String pop() {
throw new IllegalStateException("should not be called");
}
};
ReframingResponseObserver<String, String> middleware =
new ReframingResponseObserver<>(outerObserver, brokenReframer);

// Configure the mock inner controller to fail cancellation.
StreamController mockInnerController = Mockito.mock(StreamController.class);
RuntimeException fakeCancelError = new RuntimeException("fake cancel error");
Mockito.doThrow(fakeCancelError).when(mockInnerController).cancel();

// Jumpstart a call & feed it data
middleware.onStartImpl(mockInnerController);
middleware.onResponseImpl("1");

// Make sure that the outer observer was notified with the reframer, which contains a suppressed
// cancellation error.
Throwable finalError = outerObserver.getFinalError();
Truth.assertThat(finalError).isSameInstanceAs(fakeReframerError);
Truth.assertThat(ImmutableList.of(finalError.getSuppressed())).hasSize(1);
Truth.assertThat(finalError.getSuppressed()[0]).isInstanceOf(IllegalStateException.class);
Truth.assertThat(finalError.getSuppressed()[0])
.hasMessageThat()
.isEqualTo("Failed to cancel upstream while recovering from an unexpected error");
Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError);
}

/**
* A simple implementation of a {@link Reframer}. The input string is split by dash, and the
* output is concatenated by dashes. The test can verify M:N behavior by adjusting the
Expand Down