Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add shutdown() and shutdownNow() #673

Merged
merged 12 commits into from Jun 29, 2021
Expand Up @@ -296,4 +296,13 @@ void getAll(
*/
@Override
void close() throws Exception;

/**
* Initiates an orderly shutdown in which previously submitted work is finished, but no new work
* will be accepted.
*/
void shutdown();

/** Attempts to stop all actively executing work and halts the processing of waiting work. */
void shutdownNow();
}
Expand Up @@ -451,6 +451,18 @@ public void close() throws Exception {
closed = true;
}

@Override
public void shutdown() {
firestoreClient.shutdown();
closed = true;
}

@Override
public void shutdownNow() {
firestoreClient.shutdownNow();
closed = true;
}

private static class TransactionAsyncAdapter<T> implements Transaction.AsyncFunction<T> {
private final Transaction.Function<T> syncFunction;

Expand Down
Expand Up @@ -337,15 +337,18 @@ private void closeStream(final Throwable throwable) {
new Runnable() {
@Override
public void run() {
listener.onEvent(
null,
throwable instanceof FirestoreException
? (FirestoreException) throwable
: FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(getStatus(throwable).getCode()),
false)));
if (throwable instanceof FirestoreException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary becuse of grpc interrupting the channel while this could still be listenting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and the exceptions thrown then do no have status codes. This is all based on local debugging.

listener.onEvent(null, (FirestoreException) throwable);
} else {
Status status = getStatus(throwable);
FirestoreException firestoreException =
FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(status != null ? status.getCode() : Code.UNKNOWN),
false));
listener.onEvent(null, firestoreException);
}
}
});
}
Expand Down Expand Up @@ -383,31 +386,36 @@ private void initStream() {
new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
return;
}

synchronized (Watch.this) {
try {
if (!isActive.get()) {
return;
}

Preconditions.checkState(stream == null);
synchronized (Watch.this) {
if (!isActive.get()) {
return;
}

current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);
Preconditions.checkState(stream == null);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream =
firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}

stream.onNext(request.build());
stream.onNext(request.build());
}
} catch (Throwable throwable) {
onError(throwable);
}
}
},
Expand Down Expand Up @@ -549,6 +557,10 @@ private List<DocumentChange> computeSnapshot(Timestamp readTime) {
private static boolean isPermanentError(Throwable throwable) {
Status status = getStatus(throwable);

if (status == null) {
return true;
}

switch (status.getCode()) {
case CANCELLED:
case UNKNOWN:
Expand All @@ -563,20 +575,20 @@ private static boolean isPermanentError(Throwable throwable) {
}
}

/** Extracts the GRPC status code if available. Returns UNKNOWN for non-GRPC exceptions. */
/** Extracts the GRPC status code if available. Returns `null` for non-GRPC exceptions. */
@Nullable
private static Status getStatus(Throwable throwable) {
Status status = Status.UNKNOWN;

if (throwable instanceof StatusRuntimeException) {
status = ((StatusRuntimeException) throwable).getStatus();
return ((StatusRuntimeException) throwable).getStatus();
} else if (throwable instanceof StatusException) {
status = ((StatusException) throwable).getStatus();
return ((StatusException) throwable).getStatus();
}
return status;
return null;
}

/** Determines whether we need to initiate a longer backoff due to system overload. */
private static boolean isResourceExhaustedError(Throwable throwable) {
return getStatus(throwable).getCode().equals(Code.RESOURCE_EXHAUSTED);
Status status = getStatus(throwable);
return status != null && status.getCode().equals(Code.RESOURCE_EXHAUSTED);
};
}
Expand Up @@ -78,4 +78,8 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {

/** Returns a bi-directional watch stream. */
BidiStreamingCallable<ListenRequest, ListenResponse> listenCallable();

void shutdownNow();

void shutdown();
}
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.firestore.spi.v1;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GrpcCallContext;
Expand All @@ -27,8 +26,6 @@
import com.google.api.gax.rpc.NoHeaderProvider;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings.Builder;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
Expand All @@ -38,7 +35,6 @@
import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.cloud.firestore.v1.stub.FirestoreStub;
import com.google.cloud.firestore.v1.stub.FirestoreStubSettings;
import com.google.cloud.firestore.v1.stub.GrpcFirestoreStub;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand Down Expand Up @@ -126,18 +122,7 @@ public GrpcFirestoreRpc(final FirestoreOptions options) throws IOException {

clientContext = ClientContext.create(settingsBuilder.build());
}
ApiFunction<UnaryCallSettings.Builder<?, ?>, Void> retrySettingsSetter =
new ApiFunction<Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetrySettings(options.getRetrySettings());
return null;
}
};
FirestoreStubSettings.Builder firestoreBuilder =
FirestoreStubSettings.newBuilder(clientContext)
.applyToAllUnaryMethods(retrySettingsSetter);
firestoreStub = GrpcFirestoreStub.create(firestoreBuilder.build());
firestoreStub = GrpcFirestoreStub.create(clientContext);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BenWhitehead This might actually be enough to solve the underlying issue in close(), which means that we might not need to expose a separate shutdown API. With the original code, the newly created firestoreStub does not reference the existing background resources of the clientContext and thus the shutdown call does not actually do anything.

I also do not know what the consequence of not providing the retry settings builder is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to add this back by adding the existing logic from close that manually closed the background resources.

} catch (Exception e) {
throw new IOException(e);
}
Expand Down Expand Up @@ -219,6 +204,16 @@ public BidiStreamingCallable<ListenRequest, ListenResponse> listenCallable() {
return firestoreStub.listenCallable();
}

@Override
public void shutdown() {
firestoreStub.shutdown();
}

@Override
public void shutdownNow() {
firestoreStub.shutdownNow();
}

// This class is needed solely to get access to protected method setInternalHeaderProvider()
private static class FirestoreSettingsBuilder extends FirestoreSettings.Builder {
private FirestoreSettingsBuilder(FirestoreSettings settings) {
Expand Down
Expand Up @@ -384,6 +384,25 @@ public void limitToLast() throws Exception {
listenerAssertions.addedIdsIsAnyOf(emptyList(), asList("doc2", "doc3"));
}

@Test
public void shutdownNowPreventsListener() throws Exception {
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
Query query = randomColl.whereEqualTo("foo", "bar");
QuerySnapshotEventListener listener =
QuerySnapshotEventListener.builder().setExpectError().build();

// While a better test would test a shutdown after the listener has been added, this behavior
// leads to flakes as the timing of both the Watch stream and the shutdown call influence
// whether an error is raised. In some circumstances, the stream drops without any
// notifications.
firestore.shutdownNow();
query.addSnapshotListener(listener);

listener.eventsCountDownLatch.awaitError();

ListenerAssertions listenerAssertions = listener.assertions();
listenerAssertions.hasError();
}

/**
* A tuple class used by {@code #queryWatch}. This class represents an event delivered to the
* registered query listener.
Expand All @@ -402,16 +421,19 @@ private static final class ListenerEvent {
private static final class EventsCountDownLatch {
private final CountDownLatch initialEventsCountDownLatch;
private final int initialEventCount;
private final CountDownLatch errorCountDownLatch;
private final EnumMap<DocumentChange.Type, Integer> eventsCounts;
private final EnumMap<DocumentChange.Type, CountDownLatch> eventsCountDownLatches;

EventsCountDownLatch(
int initialEventCount,
int addedInitialCount,
int modifiedInitialCount,
int removedInitialCount) {
int removedInitialCount,
int errorCount) {
initialEventsCountDownLatch = new CountDownLatch(initialEventCount);
this.initialEventCount = initialEventCount;
this.errorCountDownLatch = new CountDownLatch(errorCount);
eventsCounts = new EnumMap<>(DocumentChange.Type.class);
eventsCounts.put(DocumentChange.Type.ADDED, addedInitialCount);
eventsCounts.put(DocumentChange.Type.MODIFIED, modifiedInitialCount);
Expand All @@ -432,10 +454,18 @@ void countDown(DocumentChange.Type type) {
eventsCountDownLatches.get(type).countDown();
}

void countError() {
errorCountDownLatch.countDown();
}

void awaitInitialEvents() throws InterruptedException {
initialEventsCountDownLatch.await(5 * initialEventCount, TimeUnit.SECONDS);
}

void awaitError() throws InterruptedException {
errorCountDownLatch.await(5, TimeUnit.SECONDS);
}

void await(DocumentChange.Type type) throws InterruptedException {
int count = eventsCounts.get(type);
eventsCountDownLatches.get(type).await(5 * count, TimeUnit.SECONDS);
Expand All @@ -447,11 +477,15 @@ static class QuerySnapshotEventListener implements EventListener<QuerySnapshot>
final EventsCountDownLatch eventsCountDownLatch;

private QuerySnapshotEventListener(
int initialCount, int addedEventCount, int modifiedEventCount, int removedEventCount) {
int initialCount,
int addedEventCount,
int modifiedEventCount,
int removedEventCount,
int errorCount) {
this.receivedEvents = Collections.synchronizedList(new ArrayList<ListenerEvent>());
this.eventsCountDownLatch =
new EventsCountDownLatch(
initialCount, addedEventCount, modifiedEventCount, removedEventCount);
initialCount, addedEventCount, modifiedEventCount, removedEventCount, errorCount);
}

@Override
Expand All @@ -463,6 +497,9 @@ public void onEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException
eventsCountDownLatch.countDown(docChange.getType());
}
}
if (error != null) {
eventsCountDownLatch.countError();
}
eventsCountDownLatch.countDown();
}

Expand All @@ -480,6 +517,7 @@ static final class Builder {
private int addedEventCount = 0;
private int modifiedEventCount = 0;
private int removedEventCount = 0;
private int errorCount = 0;

private Builder() {}

Expand All @@ -503,9 +541,14 @@ Builder setRemovedEventCount(int removedEventCount) {
return this;
}

Builder setExpectError() {
this.errorCount = 1;
return this;
}

public QuerySnapshotEventListener build() {
return new QuerySnapshotEventListener(
initialEventCount, addedEventCount, modifiedEventCount, removedEventCount);
initialEventCount, addedEventCount, modifiedEventCount, removedEventCount, errorCount);
}
}

Expand Down Expand Up @@ -536,6 +579,20 @@ public boolean apply(ListenerEvent input) {
assertWithMessage("snapshotListener received an error").that(anyError).isAbsent();
}

private void hasError() {
final Optional<ListenerEvent> anyError =
events.firstMatch(
new Predicate<ListenerEvent>() {
@Override
public boolean apply(ListenerEvent input) {
return input.error != null;
}
});
assertWithMessage("snapshotListener did not receive an expected error")
.that(anyError)
.isPresent();
}

private static List<QuerySnapshot> getQuerySnapshots(FluentIterable<ListenerEvent> events) {
return events
.filter(
Expand Down