Skip to content

Commit

Permalink
feat: add shutdown() and shutdownNow() (#673)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jun 29, 2021
1 parent 9915e40 commit 4f20858
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 53 deletions.
21 changes: 21 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Expand Up @@ -17,6 +17,27 @@

<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Shutdown/Shutdown Now -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/Firestore</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/Firestore</className>
<method>void shutdownNow()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>void shutdownNow()</method>
</difference>

<!-- v2.1.1 -->
<difference>
Expand Down
Expand Up @@ -296,4 +296,13 @@ void getAll(
*/
@Override
void close() throws Exception;

/**
* Initiates an orderly shutdown in which previously submitted work is finished, but no new work
* will be accepted.
*/
void shutdown();

/** Attempts to stop all actively executing work and halts the processing of waiting work. */
void shutdownNow();
}
Expand Up @@ -451,6 +451,18 @@ public void close() throws Exception {
closed = true;
}

@Override
public void shutdown() {
firestoreClient.shutdown();
closed = true;
}

@Override
public void shutdownNow() {
firestoreClient.shutdownNow();
closed = true;
}

private static class TransactionAsyncAdapter<T> implements Transaction.AsyncFunction<T> {
private final Transaction.Function<T> syncFunction;

Expand Down
Expand Up @@ -337,15 +337,18 @@ private void closeStream(final Throwable throwable) {
new Runnable() {
@Override
public void run() {
listener.onEvent(
null,
throwable instanceof FirestoreException
? (FirestoreException) throwable
: FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(getStatus(throwable).getCode()),
false)));
if (throwable instanceof FirestoreException) {
listener.onEvent(null, (FirestoreException) throwable);
} else {
Status status = getStatus(throwable);
FirestoreException firestoreException =
FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(status != null ? status.getCode() : Code.UNKNOWN),
false));
listener.onEvent(null, firestoreException);
}
}
});
}
Expand Down Expand Up @@ -383,31 +386,36 @@ private void initStream() {
new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
return;
}

synchronized (Watch.this) {
try {
if (!isActive.get()) {
return;
}

Preconditions.checkState(stream == null);
synchronized (Watch.this) {
if (!isActive.get()) {
return;
}

current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);
Preconditions.checkState(stream == null);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream =
firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}

stream.onNext(request.build());
stream.onNext(request.build());
}
} catch (Throwable throwable) {
onError(throwable);
}
}
},
Expand Down Expand Up @@ -549,6 +557,10 @@ private List<DocumentChange> computeSnapshot(Timestamp readTime) {
private static boolean isPermanentError(Throwable throwable) {
Status status = getStatus(throwable);

if (status == null) {
return true;
}

switch (status.getCode()) {
case CANCELLED:
case UNKNOWN:
Expand All @@ -563,20 +575,20 @@ private static boolean isPermanentError(Throwable throwable) {
}
}

/** Extracts the GRPC status code if available. Returns UNKNOWN for non-GRPC exceptions. */
/** Extracts the GRPC status code if available. Returns `null` for non-GRPC exceptions. */
@Nullable
private static Status getStatus(Throwable throwable) {
Status status = Status.UNKNOWN;

if (throwable instanceof StatusRuntimeException) {
status = ((StatusRuntimeException) throwable).getStatus();
return ((StatusRuntimeException) throwable).getStatus();
} else if (throwable instanceof StatusException) {
status = ((StatusException) throwable).getStatus();
return ((StatusException) throwable).getStatus();
}
return status;
return null;
}

/** Determines whether we need to initiate a longer backoff due to system overload. */
private static boolean isResourceExhaustedError(Throwable throwable) {
return getStatus(throwable).getCode().equals(Code.RESOURCE_EXHAUSTED);
Status status = getStatus(throwable);
return status != null && status.getCode().equals(Code.RESOURCE_EXHAUSTED);
};
}
Expand Up @@ -78,4 +78,8 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {

/** Returns a bi-directional watch stream. */
BidiStreamingCallable<ListenRequest, ListenResponse> listenCallable();

void shutdownNow();

void shutdown();
}
Expand Up @@ -28,7 +28,6 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings.Builder;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
Expand Down Expand Up @@ -127,7 +126,7 @@ public GrpcFirestoreRpc(final FirestoreOptions options) throws IOException {
clientContext = ClientContext.create(settingsBuilder.build());
}
ApiFunction<UnaryCallSettings.Builder<?, ?>, Void> retrySettingsSetter =
new ApiFunction<Builder<?, ?>, Void>() {
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetrySettings(options.getRetrySettings());
Expand All @@ -145,18 +144,43 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {

@Override
public void close() throws Exception {
if (!closed) {
firestoreStub.close();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
}
executorFactory.release(executor);
closed = true;
}
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.awaitTermination(1, TimeUnit.SECONDS);
}
}

@Override
public void shutdown() {
if (closed) {
return;
}
closed = true;
firestoreStub.close();
firestoreStub.shutdown();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
resource.shutdown();
}
executorFactory.release(executor);
closed = true;
}

@Override
public void shutdownNow() {
if (closed) {
return;
}
firestoreStub.shutdownNow();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.awaitTermination(1, TimeUnit.SECONDS);
resource.shutdownNow();
}
executorFactory.release(executor);
closed = true;
}

@Override
Expand Down

0 comments on commit 4f20858

Please sign in to comment.