diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml index d32596ef0..bf989e438 100644 --- a/google-cloud-firestore/clirr-ignored-differences.xml +++ b/google-cloud-firestore/clirr-ignored-differences.xml @@ -17,6 +17,27 @@ + + + 7012 + com/google/cloud/firestore/Firestore + void shutdown() + + + 7012 + com/google/cloud/firestore/Firestore + void shutdownNow() + + + 7012 + com/google/cloud/firestore/spi/v1/FirestoreRpc + void shutdown() + + + 7012 + com/google/cloud/firestore/spi/v1/FirestoreRpc + void shutdownNow() + diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java index 2cd897b49..5175fd1b4 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java @@ -296,4 +296,13 @@ void getAll( */ @Override void close() throws Exception; + + /** + * Initiates an orderly shutdown in which previously submitted work is finished, but no new work + * will be accepted. + */ + void shutdown(); + + /** Attempts to stop all actively executing work and halts the processing of waiting work. */ + void shutdownNow(); } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 7a93e711d..a4ea4f0f2 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -451,6 +451,18 @@ public void close() throws Exception { closed = true; } + @Override + public void shutdown() { + firestoreClient.shutdown(); + closed = true; + } + + @Override + public void shutdownNow() { + firestoreClient.shutdownNow(); + closed = true; + } + private static class TransactionAsyncAdapter implements Transaction.AsyncFunction { private final Transaction.Function syncFunction; diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java index f355836f0..7eda432cb 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java @@ -337,15 +337,18 @@ private void closeStream(final Throwable throwable) { new Runnable() { @Override public void run() { - listener.onEvent( - null, - throwable instanceof FirestoreException - ? (FirestoreException) throwable - : FirestoreException.forApiException( - new ApiException( - throwable, - GrpcStatusCode.of(getStatus(throwable).getCode()), - false))); + if (throwable instanceof FirestoreException) { + listener.onEvent(null, (FirestoreException) throwable); + } else { + Status status = getStatus(throwable); + FirestoreException firestoreException = + FirestoreException.forApiException( + new ApiException( + throwable, + GrpcStatusCode.of(status != null ? status.getCode() : Code.UNKNOWN), + false)); + listener.onEvent(null, firestoreException); + } } }); } @@ -383,31 +386,36 @@ private void initStream() { new Runnable() { @Override public void run() { - if (!isActive.get()) { - return; - } - - synchronized (Watch.this) { + try { if (!isActive.get()) { return; } - Preconditions.checkState(stream == null); + synchronized (Watch.this) { + if (!isActive.get()) { + return; + } - current = false; - nextAttempt = backoff.createNextAttempt(nextAttempt); + Preconditions.checkState(stream == null); - Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN); - stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable()); + current = false; + nextAttempt = backoff.createNextAttempt(nextAttempt); - ListenRequest.Builder request = ListenRequest.newBuilder(); - request.setDatabase(firestore.getDatabaseName()); - request.setAddTarget(target); - if (resumeToken != null) { - request.getAddTargetBuilder().setResumeToken(resumeToken); - } + Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN); + stream = + firestore.streamRequest(Watch.this, firestore.getClient().listenCallable()); + + ListenRequest.Builder request = ListenRequest.newBuilder(); + request.setDatabase(firestore.getDatabaseName()); + request.setAddTarget(target); + if (resumeToken != null) { + request.getAddTargetBuilder().setResumeToken(resumeToken); + } - stream.onNext(request.build()); + stream.onNext(request.build()); + } + } catch (Throwable throwable) { + onError(throwable); } } }, @@ -549,6 +557,10 @@ private List computeSnapshot(Timestamp readTime) { private static boolean isPermanentError(Throwable throwable) { Status status = getStatus(throwable); + if (status == null) { + return true; + } + switch (status.getCode()) { case CANCELLED: case UNKNOWN: @@ -563,20 +575,20 @@ private static boolean isPermanentError(Throwable throwable) { } } - /** Extracts the GRPC status code if available. Returns UNKNOWN for non-GRPC exceptions. */ + /** Extracts the GRPC status code if available. Returns `null` for non-GRPC exceptions. */ + @Nullable private static Status getStatus(Throwable throwable) { - Status status = Status.UNKNOWN; - if (throwable instanceof StatusRuntimeException) { - status = ((StatusRuntimeException) throwable).getStatus(); + return ((StatusRuntimeException) throwable).getStatus(); } else if (throwable instanceof StatusException) { - status = ((StatusException) throwable).getStatus(); + return ((StatusException) throwable).getStatus(); } - return status; + return null; } /** Determines whether we need to initiate a longer backoff due to system overload. */ private static boolean isResourceExhaustedError(Throwable throwable) { - return getStatus(throwable).getCode().equals(Code.RESOURCE_EXHAUSTED); + Status status = getStatus(throwable); + return status != null && status.getCode().equals(Code.RESOURCE_EXHAUSTED); }; } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java index d31d6557c..c7fbcd3d9 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java @@ -78,4 +78,8 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc { /** Returns a bi-directional watch stream. */ BidiStreamingCallable listenCallable(); + + void shutdownNow(); + + void shutdown(); } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java index 692cb77b4..edd547130 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java @@ -28,7 +28,6 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.UnaryCallSettings; -import com.google.api.gax.rpc.UnaryCallSettings.Builder; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; @@ -127,7 +126,7 @@ public GrpcFirestoreRpc(final FirestoreOptions options) throws IOException { clientContext = ClientContext.create(settingsBuilder.build()); } ApiFunction, Void> retrySettingsSetter = - new ApiFunction, Void>() { + new ApiFunction, Void>() { @Override public Void apply(UnaryCallSettings.Builder builder) { builder.setRetrySettings(options.getRetrySettings()); @@ -145,18 +144,43 @@ public Void apply(UnaryCallSettings.Builder builder) { @Override public void close() throws Exception { + if (!closed) { + firestoreStub.close(); + for (BackgroundResource resource : clientContext.getBackgroundResources()) { + resource.close(); + } + executorFactory.release(executor); + closed = true; + } + for (BackgroundResource resource : clientContext.getBackgroundResources()) { + resource.awaitTermination(1, TimeUnit.SECONDS); + } + } + + @Override + public void shutdown() { if (closed) { return; } - closed = true; - firestoreStub.close(); + firestoreStub.shutdown(); for (BackgroundResource resource : clientContext.getBackgroundResources()) { - resource.close(); + resource.shutdown(); } + executorFactory.release(executor); + closed = true; + } + + @Override + public void shutdownNow() { + if (closed) { + return; + } + firestoreStub.shutdownNow(); for (BackgroundResource resource : clientContext.getBackgroundResources()) { - resource.awaitTermination(1, TimeUnit.SECONDS); + resource.shutdownNow(); } executorFactory.release(executor); + closed = true; } @Override diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java index cb9d69ec1..c053af510 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java @@ -54,9 +54,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -69,21 +68,17 @@ public final class ITQueryWatchTest { private CollectionReference randomColl; - @BeforeClass - public static void beforeClass() { - FirestoreOptions firestoreOptions = FirestoreOptions.newBuilder().build(); - firestore = firestoreOptions.getService(); - } - @Before public void before() { + FirestoreOptions firestoreOptions = FirestoreOptions.newBuilder().build(); + firestore = firestoreOptions.getService(); String autoId = LocalFirestoreHelper.autoId(); String collPath = String.format("java-%s-%s", testName.getMethodName(), autoId); randomColl = firestore.collection(collPath); } - @AfterClass - public static void afterClass() throws Exception { + @After + public void after() throws Exception { Preconditions.checkNotNull( firestore, "Error instantiating Firestore. Check that the service account credentials were properly set."); @@ -384,6 +379,36 @@ public void limitToLast() throws Exception { listenerAssertions.addedIdsIsAnyOf(emptyList(), asList("doc2", "doc3")); } + @Test + public void shutdownNowTerminatesActiveListener() throws Exception { + Query query = randomColl.whereEqualTo("foo", "bar"); + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder().setExpectError().build(); + + query.addSnapshotListener(listener); + firestore.shutdownNow(); + + listener.eventsCountDownLatch.awaitError(); + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.hasError(); + } + + @Test + public void shutdownNowPreventsAddingNewListener() throws Exception { + Query query = randomColl.whereEqualTo("foo", "bar"); + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder().setExpectError().build(); + + firestore.shutdownNow(); + query.addSnapshotListener(listener); + + listener.eventsCountDownLatch.awaitError(); + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.hasError(); + } + /** * A tuple class used by {@code #queryWatch}. This class represents an event delivered to the * registered query listener. @@ -402,6 +427,7 @@ private static final class ListenerEvent { private static final class EventsCountDownLatch { private final CountDownLatch initialEventsCountDownLatch; private final int initialEventCount; + private final CountDownLatch errorCountDownLatch; private final EnumMap eventsCounts; private final EnumMap eventsCountDownLatches; @@ -409,9 +435,11 @@ private static final class EventsCountDownLatch { int initialEventCount, int addedInitialCount, int modifiedInitialCount, - int removedInitialCount) { + int removedInitialCount, + int errorCount) { initialEventsCountDownLatch = new CountDownLatch(initialEventCount); this.initialEventCount = initialEventCount; + this.errorCountDownLatch = new CountDownLatch(errorCount); eventsCounts = new EnumMap<>(DocumentChange.Type.class); eventsCounts.put(DocumentChange.Type.ADDED, addedInitialCount); eventsCounts.put(DocumentChange.Type.MODIFIED, modifiedInitialCount); @@ -432,10 +460,18 @@ void countDown(DocumentChange.Type type) { eventsCountDownLatches.get(type).countDown(); } + void countError() { + errorCountDownLatch.countDown(); + } + void awaitInitialEvents() throws InterruptedException { initialEventsCountDownLatch.await(5 * initialEventCount, TimeUnit.SECONDS); } + void awaitError() throws InterruptedException { + errorCountDownLatch.await(5, TimeUnit.SECONDS); + } + void await(DocumentChange.Type type) throws InterruptedException { int count = eventsCounts.get(type); eventsCountDownLatches.get(type).await(5 * count, TimeUnit.SECONDS); @@ -447,11 +483,15 @@ static class QuerySnapshotEventListener implements EventListener final EventsCountDownLatch eventsCountDownLatch; private QuerySnapshotEventListener( - int initialCount, int addedEventCount, int modifiedEventCount, int removedEventCount) { + int initialCount, + int addedEventCount, + int modifiedEventCount, + int removedEventCount, + int errorCount) { this.receivedEvents = Collections.synchronizedList(new ArrayList()); this.eventsCountDownLatch = new EventsCountDownLatch( - initialCount, addedEventCount, modifiedEventCount, removedEventCount); + initialCount, addedEventCount, modifiedEventCount, removedEventCount, errorCount); } @Override @@ -463,6 +503,9 @@ public void onEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException eventsCountDownLatch.countDown(docChange.getType()); } } + if (error != null) { + eventsCountDownLatch.countError(); + } eventsCountDownLatch.countDown(); } @@ -480,6 +523,7 @@ static final class Builder { private int addedEventCount = 0; private int modifiedEventCount = 0; private int removedEventCount = 0; + private int errorCount = 0; private Builder() {} @@ -503,9 +547,14 @@ Builder setRemovedEventCount(int removedEventCount) { return this; } + Builder setExpectError() { + this.errorCount = 1; + return this; + } + public QuerySnapshotEventListener build() { return new QuerySnapshotEventListener( - initialEventCount, addedEventCount, modifiedEventCount, removedEventCount); + initialEventCount, addedEventCount, modifiedEventCount, removedEventCount, errorCount); } } @@ -536,6 +585,20 @@ public boolean apply(ListenerEvent input) { assertWithMessage("snapshotListener received an error").that(anyError).isAbsent(); } + private void hasError() { + final Optional anyError = + events.firstMatch( + new Predicate() { + @Override + public boolean apply(ListenerEvent input) { + return input.error != null; + } + }); + assertWithMessage("snapshotListener did not receive an expected error") + .that(anyError) + .isPresent(); + } + private static List getQuerySnapshots(FluentIterable events) { return events .filter( diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITShutdownTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITShutdownTest.java new file mode 100644 index 000000000..b6e9e87d5 --- /dev/null +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITShutdownTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore.it; + +import static org.junit.Assert.fail; + +import com.google.api.core.SettableApiFuture; +import com.google.cloud.firestore.EventListener; +import com.google.cloud.firestore.Firestore; +import com.google.cloud.firestore.FirestoreException; +import com.google.cloud.firestore.FirestoreOptions; +import com.google.cloud.firestore.ListenerRegistration; +import com.google.cloud.firestore.LocalFirestoreHelper; +import com.google.cloud.firestore.QuerySnapshot; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +public class ITShutdownTest { + @Rule public final Timeout timeout = new Timeout(5, TimeUnit.SECONDS); + @Rule public TestName testName = new TestName(); + + @Test + public void closeSuccess_withListenerRemove() throws Exception { + Firestore fs = FirestoreOptions.getDefaultInstance().getService(); + ListenerRegistration listener = attachListener(fs); + listener.remove(); + fs.close(); + } + + @Test + public void closeFailure_withoutListenerRemove() throws Exception { + final Firestore fs = FirestoreOptions.getDefaultInstance().getService(); + attachListener(fs); + + ExecutorService testExecutorService = Executors.newSingleThreadExecutor(); + final SettableApiFuture result = SettableApiFuture.create(); + testExecutorService.submit( + new Runnable() { + @Override + public void run() { + try { + fs.close(); + result.set(null); + } catch (Throwable throwable) { + result.setException(throwable); + } + } + }); + + try { + result.get(1, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException e) { + // Expected + } finally { + testExecutorService.shutdown(); + } + } + + @Test + public void shutdownNowSuccess_withoutListenerRemove() throws Exception { + Firestore fs = FirestoreOptions.getDefaultInstance().getService(); + attachListener(fs); + fs.shutdownNow(); + } + + @Test + public void shutdownSuccess_withoutListenerRemove() throws Exception { + Firestore fs = FirestoreOptions.getDefaultInstance().getService(); + attachListener(fs); + fs.shutdown(); + } + + @Test + public void closeAndShutdown() throws Exception { + Firestore fs = FirestoreOptions.getDefaultInstance().getService(); + attachListener(fs); + fs.shutdown(); + fs.shutdownNow(); + fs.close(); + } + + private ListenerRegistration attachListener(Firestore fs) throws InterruptedException { + final CountDownLatch cdl = new CountDownLatch(1); + ListenerRegistration listenerRegistration = + fs.collection( + String.format( + "java-%s-%s", testName.getMethodName(), LocalFirestoreHelper.autoId())) + .addSnapshotListener( + new EventListener() { + @Override + public void onEvent( + @Nullable QuerySnapshot value, @Nullable FirestoreException error) { + cdl.countDown(); + } + }); + cdl.await(); + return listenerRegistration; + } +}