Skip to content

Commit

Permalink
fix: Modify CommitterImpl to exit early in case of permanent failure …
Browse files Browse the repository at this point in the history
…to avoid waiting on its own actions to complete. (#90)
  • Loading branch information
dpcollins-google committed Jun 2, 2020
1 parent bd67e98 commit 8858d58
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
Expand Up @@ -45,6 +45,9 @@ public class CommitterImpl extends ProxyService
@GuardedBy("monitor.monitor")
private boolean shutdown = false;

@GuardedBy("monitor.monitor")
private boolean hadPermanentError = false;

@GuardedBy("monitor.monitor")
private final CommitState state = new CommitState();

Expand Down Expand Up @@ -72,6 +75,7 @@ public CommitterImpl(CursorServiceStub stub, InitialCommitCursorRequest request)
@Override
protected void handlePermanentError(StatusException error) {
try (CloseableMonitor.Hold h = monitor.enter()) {
hadPermanentError = true;
shutdown = true;
state.abort(error);
}
Expand All @@ -90,9 +94,8 @@ protected void stop() {
new Guard(monitor.monitor) {
@Override
public boolean isSatisfied() {
// Wait until the state is empty. It will be made empty by a call to state.abort()
// if an error occurs.
return state.isEmpty();
// Wait until the state is empty or a permanent error occurred.
return state.isEmpty() || hadPermanentError;
}
})) {}
}
Expand Down
Expand Up @@ -36,6 +36,7 @@
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.StatusExceptionMatcher;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc.CursorServiceStub;
Expand All @@ -44,6 +45,7 @@
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
Expand Down Expand Up @@ -178,4 +180,19 @@ public void multipleSentCompletedInOrder() {

verify(permanentErrorHandler, times(0)).failed(any(), any());
}

@Test
public void stopInCommitCallback() throws Exception {
ApiFuture<Void> future = committer.commitOffset(Offset.of(10));
CountDownLatch latch = new CountDownLatch(1);
ExtractStatus.addFailureHandler(future, (error) -> {
committer.stopAsync();
latch.countDown();
});
leakedResponseObserver.onError(Status.FAILED_PRECONDITION.asException());
latch.await();
assertFutureThrowsCode(future, Code.FAILED_PRECONDITION);
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION)));
}
}

0 comments on commit 8858d58

Please sign in to comment.