Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add shutdown() and shutdownNow() #673

Merged
merged 12 commits into from Jun 29, 2021
21 changes: 21 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Expand Up @@ -17,6 +17,27 @@

<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Shutdown/Shutdown Now -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/Firestore</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/Firestore</className>
<method>void shutdownNow()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>void shutdownNow()</method>
</difference>

<!-- v2.1.1 -->
<difference>
Expand Down
Expand Up @@ -296,4 +296,13 @@ void getAll(
*/
@Override
void close() throws Exception;

/**
* Initiates an orderly shutdown in which previously submitted work is finished, but no new work
* will be accepted.
*/
void shutdown();

/** Attempts to stop all actively executing work and halts the processing of waiting work. */
void shutdownNow();
}
Expand Up @@ -447,7 +447,18 @@ public FirestoreOptions getOptions() {

@Override
public void close() throws Exception {
firestoreClient.close();
shutdown();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised this doesn't still call firestoreClient.close(). It looks like in GrpcFirestoreRpc you factored things to keep the existing behavior for close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have an opinion here - I do think it makes sense to keep the existing semantics, especially since we are adding a couple of new methods. If I call close() here the newly added shutdown tests time out though. Let me know if you want me to go back to calling close(), in which case I will remove the tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think keeping the existing semantic is probably beneficial.

For updating the test we may be able to do something like:

    ExecutorService testExecutorService = Executors.newSingleThreadExecutor();
    final Future<?> f = testExecutorService.submit(new Runnable() {
      @Override
      public void run() {
        firestore.close();
      }
    });
    
    try {
      f.get(5, TimeUnit.SECONDS);
      fail();
    } catch (TimeoutException e) {
      // expected
    } finally {
      testExecutorService.shutdown();
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks for the reviews.

}

@Override
public void shutdown() {
firestoreClient.shutdown();
closed = true;
}

@Override
public void shutdownNow() {
firestoreClient.shutdownNow();
closed = true;
}

Expand Down
Expand Up @@ -337,15 +337,18 @@ private void closeStream(final Throwable throwable) {
new Runnable() {
@Override
public void run() {
listener.onEvent(
null,
throwable instanceof FirestoreException
? (FirestoreException) throwable
: FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(getStatus(throwable).getCode()),
false)));
if (throwable instanceof FirestoreException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary becuse of grpc interrupting the channel while this could still be listenting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and the exceptions thrown then do no have status codes. This is all based on local debugging.

listener.onEvent(null, (FirestoreException) throwable);
} else {
Status status = getStatus(throwable);
FirestoreException firestoreException =
FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(status != null ? status.getCode() : Code.UNKNOWN),
false));
listener.onEvent(null, firestoreException);
}
}
});
}
Expand Down Expand Up @@ -383,31 +386,36 @@ private void initStream() {
new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
return;
}

synchronized (Watch.this) {
try {
if (!isActive.get()) {
return;
}

Preconditions.checkState(stream == null);
synchronized (Watch.this) {
if (!isActive.get()) {
return;
}

current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);
Preconditions.checkState(stream == null);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream =
firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}

stream.onNext(request.build());
stream.onNext(request.build());
}
} catch (Throwable throwable) {
onError(throwable);
}
}
},
Expand Down Expand Up @@ -549,6 +557,10 @@ private List<DocumentChange> computeSnapshot(Timestamp readTime) {
private static boolean isPermanentError(Throwable throwable) {
Status status = getStatus(throwable);

if (status == null) {
return true;
}

switch (status.getCode()) {
case CANCELLED:
case UNKNOWN:
Expand All @@ -563,20 +575,20 @@ private static boolean isPermanentError(Throwable throwable) {
}
}

/** Extracts the GRPC status code if available. Returns UNKNOWN for non-GRPC exceptions. */
/** Extracts the GRPC status code if available. Returns `null` for non-GRPC exceptions. */
@Nullable
private static Status getStatus(Throwable throwable) {
Status status = Status.UNKNOWN;

if (throwable instanceof StatusRuntimeException) {
status = ((StatusRuntimeException) throwable).getStatus();
return ((StatusRuntimeException) throwable).getStatus();
} else if (throwable instanceof StatusException) {
status = ((StatusException) throwable).getStatus();
return ((StatusException) throwable).getStatus();
}
return status;
return null;
}

/** Determines whether we need to initiate a longer backoff due to system overload. */
private static boolean isResourceExhaustedError(Throwable throwable) {
return getStatus(throwable).getCode().equals(Code.RESOURCE_EXHAUSTED);
Status status = getStatus(throwable);
return status != null && status.getCode().equals(Code.RESOURCE_EXHAUSTED);
};
}
Expand Up @@ -78,4 +78,8 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {

/** Returns a bi-directional watch stream. */
BidiStreamingCallable<ListenRequest, ListenResponse> listenCallable();

void shutdownNow();

void shutdown();
}
Expand Up @@ -28,7 +28,6 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings.Builder;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
Expand Down Expand Up @@ -127,7 +126,7 @@ public GrpcFirestoreRpc(final FirestoreOptions options) throws IOException {
clientContext = ClientContext.create(settingsBuilder.build());
}
ApiFunction<UnaryCallSettings.Builder<?, ?>, Void> retrySettingsSetter =
new ApiFunction<Builder<?, ?>, Void>() {
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetrySettings(options.getRetrySettings());
Expand All @@ -145,18 +144,43 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {

@Override
public void close() throws Exception {
if (!closed) {
firestoreStub.close();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
}
executorFactory.release(executor);
closed = true;
}
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.awaitTermination(1, TimeUnit.SECONDS);
}
}

@Override
public void shutdown() {
if (closed) {
return;
}
closed = true;
firestoreStub.close();
firestoreStub.shutdown();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
resource.shutdown();
}
executorFactory.release(executor);
closed = true;
}

@Override
public void shutdownNow() {
if (closed) {
return;
}
firestoreStub.shutdownNow();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.awaitTermination(1, TimeUnit.SECONDS);
resource.shutdownNow();
}
executorFactory.release(executor);
closed = true;
}

@Override
Expand Down