diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java
new file mode 100644
index 000000000..644488118
--- /dev/null
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java
@@ -0,0 +1,641 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore.it;
+
+import static com.google.cloud.firestore.LocalFirestoreHelper.map;
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static java.util.Collections.emptySet;
+
+import com.google.cloud.firestore.CollectionReference;
+import com.google.cloud.firestore.DocumentChange;
+import com.google.cloud.firestore.DocumentReference;
+import com.google.cloud.firestore.EventListener;
+import com.google.cloud.firestore.Firestore;
+import com.google.cloud.firestore.FirestoreException;
+import com.google.cloud.firestore.FirestoreOptions;
+import com.google.cloud.firestore.ListenerRegistration;
+import com.google.cloud.firestore.LocalFirestoreHelper;
+import com.google.cloud.firestore.Query;
+import com.google.cloud.firestore.QueryDocumentSnapshot;
+import com.google.cloud.firestore.QuerySnapshot;
+import com.google.cloud.firestore.it.ITQueryWatchTest.QuerySnapshotEventListener.ListenerAssertions;
+import com.google.common.base.Joiner;
+import com.google.common.base.Joiner.MapJoiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Range;
+import com.google.common.truth.Truth;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public final class ITQueryWatchTest {
+
+ private static Firestore firestore;
+
+ @Rule public TestName testName = new TestName();
+
+ private CollectionReference randomColl;
+
+ @BeforeClass
+ public static void beforeClass() {
+ FirestoreOptions firestoreOptions = FirestoreOptions.newBuilder().build();
+ firestore = firestoreOptions.getService();
+ }
+
+ @Before
+ public void before() {
+ String autoId = LocalFirestoreHelper.autoId();
+ String collPath = String.format("java-%s-%s", testName.getMethodName(), autoId);
+ randomColl = firestore.collection(collPath);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ firestore.close();
+ }
+
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with empty results.
+ *
- Verify the listener receives an empty event.
+ *
+ */
+ @Test
+ public void emptyResults() throws InterruptedException {
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ // register the snapshot listener for the query
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder().setInitialEventCount(1).build();
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1));
+ listenerAssertions.addedIdsIsAnyOf(emptySet());
+ listenerAssertions.modifiedIdsIsAnyOf(emptySet());
+ listenerAssertions.removedIdsIsAnyOf(emptySet());
+ }
+
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with non-empty results.
+ *
- Verify the listener receives an event including the expected document.
+ *
+ */
+ @Test
+ public void nonEmptyResults() throws InterruptedException, TimeoutException, ExecutionException {
+ // create a document in our collection that will match the query
+ randomColl.document("doc").set(map("foo", "bar")).get(5, TimeUnit.SECONDS);
+
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder().setInitialEventCount(1).build();
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1));
+ listenerAssertions.addedIdsIsAnyOf(newHashSet("doc"));
+ listenerAssertions.modifiedIdsIsAnyOf(emptySet());
+ listenerAssertions.removedIdsIsAnyOf(emptySet());
+ }
+
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with empty results.
+ *
- Create a new document that matches the query.
+ *
- Verify newly created document results in an ADDED event.
+ *
+ */
+ @Test
+ public void emptyResults_newDocument_ADDED()
+ throws InterruptedException, TimeoutException, ExecutionException {
+
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder().setInitialEventCount(1).setAddedEventCount(1).build();
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ randomColl.document("doc").set(map("foo", "bar")).get(5, TimeUnit.SECONDS);
+ listener.eventsCountDownLatch.await(DocumentChange.Type.ADDED);
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
+ listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+ listenerAssertions.modifiedIdsIsAnyOf(emptySet());
+ listenerAssertions.removedIdsIsAnyOf(emptySet());
+ }
+
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with empty results.
+ *
- Modify an existing document so that it matches the query.
+ *
- Verify newly created document results in an ADDED event.
+ *
+ */
+ @Test
+ public void emptyResults_modifiedDocument_ADDED()
+ throws InterruptedException, TimeoutException, ExecutionException {
+ // create our "existing non-matching document"
+ randomColl.document("doc").set(map("baz", "baz")).get(5, TimeUnit.SECONDS);
+
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder().setInitialEventCount(1).setAddedEventCount(1).build();
+ List receivedEvents = listener.receivedEvents;
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ randomColl.document("doc").update("foo", "bar").get(5, TimeUnit.SECONDS);
+ listener.eventsCountDownLatch.await(DocumentChange.Type.ADDED);
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
+ listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+ listenerAssertions.modifiedIdsIsAnyOf(emptySet());
+ listenerAssertions.removedIdsIsAnyOf(emptySet());
+
+ ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ //noinspection ConstantConditions guarded by "assertNoError" above
+ QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
+ assertThat(doc.get("foo")).isEqualTo("bar");
+ assertThat(doc.get("baz")).isEqualTo("baz");
+ }
+
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with non-empty results.
+ *
- Modify an existing document that is part of the results.
+ *
- Verify modified document results in a MODIFIED event.
+ *
+ */
+ @Test
+ public void nonEmptyResults_modifiedDocument_MODIFIED()
+ throws InterruptedException, TimeoutException, ExecutionException {
+ DocumentReference testDoc = randomColl.document("doc");
+ // create our "existing non-matching document"
+ testDoc.set(map("foo", "bar")).get(5, TimeUnit.SECONDS);
+
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ // register the snapshot listener for the query
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder()
+ .setInitialEventCount(1)
+ .setModifiedEventCount(1)
+ .build();
+ List receivedEvents = listener.receivedEvents;
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ testDoc.update("baz", "baz").get(5, TimeUnit.SECONDS);
+ listener.eventsCountDownLatch.await(DocumentChange.Type.MODIFIED);
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
+ listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+ listenerAssertions.modifiedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+ listenerAssertions.removedIdsIsAnyOf(emptySet());
+
+ ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ //noinspection ConstantConditions guarded by "assertNoError" above
+ QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
+ assertThat(doc.get("foo")).isEqualTo("bar");
+ assertThat(doc.get("baz")).isEqualTo("baz");
+ }
+
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with non-empty results.
+ *
- Delete an existing document that is part of the results.
+ *
- Verify deleted document results in a REMOVED event.
+ *
+ */
+ @Test
+ public void nonEmptyResults_deletedDocument_REMOVED()
+ throws InterruptedException, TimeoutException, ExecutionException {
+ DocumentReference testDoc = randomColl.document("doc");
+ // create our "existing non-matching document"
+ testDoc.set(map("foo", "bar")).get(5, TimeUnit.SECONDS);
+
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ // register the snapshot listener for the query
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder()
+ .setInitialEventCount(1)
+ .setRemovedEventCount(1)
+ .build();
+ List receivedEvents = listener.receivedEvents;
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ testDoc.delete().get(5, TimeUnit.SECONDS);
+ listener.eventsCountDownLatch.await(DocumentChange.Type.REMOVED);
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
+ listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+ listenerAssertions.modifiedIdsIsAnyOf(emptySet());
+ listenerAssertions.removedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+
+ ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ //noinspection ConstantConditions guarded by "assertNoError" above
+ QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
+ assertThat(doc.get("foo")).isEqualTo("bar");
+ }
+
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with non-empty results.
+ *
- Modify an existing document that is part of the results to no longer match the query.
+ *
- Verify modified document results in a REMOVED event.
+ *
+ */
+ @Test
+ public void nonEmptyResults_modifiedDocument_REMOVED()
+ throws InterruptedException, TimeoutException, ExecutionException {
+ DocumentReference testDoc = randomColl.document("doc");
+ // create our "existing non-matching document"
+ testDoc.set(map("foo", "bar")).get(5, TimeUnit.SECONDS);
+
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ // register the snapshot listener for the query
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder()
+ .setInitialEventCount(1)
+ .setRemovedEventCount(1)
+ .build();
+ List receivedEvents = listener.receivedEvents;
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ testDoc.set(map("bar", "foo")).get(5, TimeUnit.SECONDS);
+ listener.eventsCountDownLatch.await(DocumentChange.Type.REMOVED);
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
+ listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+ listenerAssertions.modifiedIdsIsAnyOf(emptySet());
+ listenerAssertions.removedIdsIsAnyOf(emptySet(), newHashSet("doc"));
+
+ ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ //noinspection ConstantConditions guarded by "assertNoError" above
+ QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
+ assertThat(doc.get("foo")).isEqualTo("bar");
+ }
+
+ /**
+ * A tuple class used by {@code #queryWatch}. This class represents an event delivered to the
+ * registered query listener.
+ */
+ private static final class ListenerEvent {
+
+ @Nullable private final QuerySnapshot value;
+ @Nullable private final FirestoreException error;
+
+ ListenerEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) {
+ this.value = value;
+ this.error = error;
+ }
+ }
+
+ private static final class EventsCountDownLatch {
+ private final CountDownLatch initialEventsCountDownLatch;
+ private final int initialEventCount;
+ private final EnumMap eventsCounts;
+ private final EnumMap eventsCountDownLatches;
+
+ EventsCountDownLatch(
+ int initialEventCount,
+ int addedInitialCount,
+ int modifiedInitialCount,
+ int removedInitialCount) {
+ initialEventsCountDownLatch = new CountDownLatch(initialEventCount);
+ this.initialEventCount = initialEventCount;
+ eventsCounts = new EnumMap<>(DocumentChange.Type.class);
+ eventsCounts.put(DocumentChange.Type.ADDED, addedInitialCount);
+ eventsCounts.put(DocumentChange.Type.MODIFIED, modifiedInitialCount);
+ eventsCounts.put(DocumentChange.Type.REMOVED, removedInitialCount);
+ eventsCountDownLatches = new EnumMap<>(DocumentChange.Type.class);
+ eventsCountDownLatches.put(DocumentChange.Type.ADDED, new CountDownLatch(addedInitialCount));
+ eventsCountDownLatches.put(
+ DocumentChange.Type.MODIFIED, new CountDownLatch(modifiedInitialCount));
+ eventsCountDownLatches.put(
+ DocumentChange.Type.REMOVED, new CountDownLatch(removedInitialCount));
+ }
+
+ void countDown() {
+ initialEventsCountDownLatch.countDown();
+ }
+
+ void countDown(DocumentChange.Type type) {
+ eventsCountDownLatches.get(type).countDown();
+ }
+
+ void awaitInitialEvents() throws InterruptedException {
+ initialEventsCountDownLatch.await(5 * initialEventCount, TimeUnit.SECONDS);
+ }
+
+ void await(DocumentChange.Type type) throws InterruptedException {
+ int count = eventsCounts.get(type);
+ eventsCountDownLatches.get(type).await(5 * count, TimeUnit.SECONDS);
+ }
+ }
+
+ static class QuerySnapshotEventListener implements EventListener {
+ final List receivedEvents;
+ final EventsCountDownLatch eventsCountDownLatch;
+
+ private QuerySnapshotEventListener(
+ int initialCount, int addedEventCount, int modifiedEventCount, int removedEventCount) {
+ this.receivedEvents = Collections.synchronizedList(new ArrayList());
+ this.eventsCountDownLatch =
+ new EventsCountDownLatch(
+ initialCount, addedEventCount, modifiedEventCount, removedEventCount);
+ }
+
+ @Override
+ public void onEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) {
+ receivedEvents.add(new ListenerEvent(value, error));
+ if (value != null) {
+ List documentChanges = value.getDocumentChanges();
+ for (DocumentChange docChange : documentChanges) {
+ eventsCountDownLatch.countDown(docChange.getType());
+ }
+ }
+ eventsCountDownLatch.countDown();
+ }
+
+ ListenerAssertions assertions() {
+ return new ListenerAssertions(receivedEvents);
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ static final class Builder {
+ private int initialEventCount = 0;
+ private int addedEventCount = 0;
+ private int modifiedEventCount = 0;
+ private int removedEventCount = 0;
+
+ private Builder() {}
+
+ Builder setInitialEventCount(int initialEventCount) {
+ this.initialEventCount = initialEventCount;
+ return this;
+ }
+
+ Builder setAddedEventCount(int addedEventCount) {
+ this.addedEventCount = addedEventCount;
+ return this;
+ }
+
+ Builder setModifiedEventCount(int modifiedEventCount) {
+ this.modifiedEventCount = modifiedEventCount;
+ return this;
+ }
+
+ Builder setRemovedEventCount(int removedEventCount) {
+ this.removedEventCount = removedEventCount;
+ return this;
+ }
+
+ public QuerySnapshotEventListener build() {
+ return new QuerySnapshotEventListener(
+ initialEventCount, addedEventCount, modifiedEventCount, removedEventCount);
+ }
+ }
+
+ static final class ListenerAssertions {
+ private static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");
+ private final FluentIterable events;
+ private final Set addedIds;
+ private final Set modifiedIds;
+ private final Set removedIds;
+
+ ListenerAssertions(List receivedEvents) {
+ events = FluentIterable.from(receivedEvents);
+ List querySnapshots = getQuerySnapshots(events);
+ addedIds = getIds(querySnapshots, DocumentChange.Type.ADDED);
+ modifiedIds = getIds(querySnapshots, DocumentChange.Type.MODIFIED);
+ removedIds = getIds(querySnapshots, DocumentChange.Type.REMOVED);
+ }
+
+ private void noError() {
+ final Optional anyError =
+ events.firstMatch(
+ new Predicate() {
+ @Override
+ public boolean apply(ListenerEvent input) {
+ return input.error != null;
+ }
+ });
+ assertWithMessage("snapshotListener received an error").that(anyError).isAbsent();
+ }
+
+ private static List getQuerySnapshots(FluentIterable events) {
+ return events
+ .filter(
+ new Predicate() {
+ @Override
+ public boolean apply(ListenerEvent input) {
+ return input.value != null;
+ }
+ })
+ .transform(
+ new com.google.common.base.Function() {
+ @Override
+ public QuerySnapshot apply(ListenerEvent input) {
+ return input.value;
+ }
+ })
+ .toList();
+ }
+
+ private static Set getIds(
+ List querySnapshots, DocumentChange.Type type) {
+ final Set documentIds = new HashSet<>();
+ for (QuerySnapshot querySnapshot : querySnapshots) {
+ final List changes = querySnapshot.getDocumentChanges();
+ for (DocumentChange change : changes) {
+ if (change.getType() == type) {
+ documentIds.add(change.getDocument().getId());
+ }
+ }
+ }
+ return documentIds;
+ }
+
+ void addedIdsIsAnyOf(Set> s) {
+ Truth.assertWithMessage(debugMessage()).that(addedIds).isEqualTo(s);
+ }
+
+ void addedIdsIsAnyOf(Set> s1, Set> s2) {
+ Truth.assertWithMessage(debugMessage()).that(addedIds).isAnyOf(s1, s2);
+ }
+
+ void modifiedIdsIsAnyOf(Set> s) {
+ Truth.assertWithMessage(debugMessage()).that(modifiedIds).isEqualTo(s);
+ }
+
+ void modifiedIdsIsAnyOf(Set> s1, Set> s2) {
+ Truth.assertWithMessage(debugMessage()).that(modifiedIds).isAnyOf(s1, s2);
+ }
+
+ void removedIdsIsAnyOf(Set> s) {
+ Truth.assertWithMessage(debugMessage()).that(removedIds).isEqualTo(s);
+ }
+
+ void removedIdsIsAnyOf(Set> s1, Set> s2) {
+ Truth.assertWithMessage(debugMessage()).that(removedIds).isAnyOf(s1, s2);
+ }
+
+ void eventCountIsAnyOf(Range range) {
+ Truth.assertWithMessage(debugMessage()).that(events.size()).isIn(range);
+ }
+
+ private String debugMessage() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("events[\n");
+ for (ListenerEvent receivedEvent : events) {
+ builder.append("event{");
+ builder.append("error=").append(receivedEvent.error);
+ builder.append(",");
+ builder.append("value=");
+ debugMessage(builder, receivedEvent.value);
+ builder.append("},\n");
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ private static void debugMessage(StringBuilder builder, QuerySnapshot qs) {
+ if (qs == null) {
+ builder.append("null");
+ } else {
+ builder.append("{");
+ List documents = qs.getDocuments();
+ builder.append("documents[");
+ for (QueryDocumentSnapshot document : documents) {
+ debugMessage(builder, document);
+ }
+ builder.append("],");
+ List changes = qs.getDocumentChanges();
+ builder.append("documentChanges[");
+ for (DocumentChange change : changes) {
+ debugMessage(builder, change.getDocument());
+ }
+ builder.append("]");
+ builder.append("}");
+ }
+ }
+
+ private static void debugMessage(
+ StringBuilder builder, QueryDocumentSnapshot queryDocumentSnapshot) {
+ if (queryDocumentSnapshot == null) {
+ builder.append("null");
+ } else {
+ builder.append("{");
+ builder.append("path=").append(queryDocumentSnapshot.getReference().getPath());
+ builder.append(",");
+ builder.append("data=");
+ debugMessage(builder, queryDocumentSnapshot.getData());
+ builder.append("}");
+ }
+ }
+
+ private static void debugMessage(StringBuilder builder, Map data) {
+ builder.append("{");
+ MAP_JOINER.appendTo(builder, data);
+ builder.append("}");
+ }
+ }
+ }
+}
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java
index 764337438..f5f9c1e24 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java
@@ -17,8 +17,6 @@
package com.google.cloud.firestore.it;
import static com.google.cloud.firestore.LocalFirestoreHelper.map;
-import static com.google.common.collect.Sets.newHashSet;
-import static com.google.common.truth.Truth.assertWithMessage;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -34,7 +32,6 @@
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.CollectionReference;
-import com.google.cloud.firestore.DocumentChange;
import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot;
import com.google.cloud.firestore.EventListener;
@@ -57,25 +54,17 @@
import com.google.cloud.firestore.Transaction.Function;
import com.google.cloud.firestore.WriteBatch;
import com.google.cloud.firestore.WriteResult;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.junit.After;
@@ -901,203 +890,6 @@ public void onEvent(
}
}
- @Test
- public void queryWatch() throws Exception {
- // This test has quite a bit of machinery to it, as there are several things going on.
- // The main scenario we are test is: 'Given a query, listen for snapshot updates made to that
- // query'
- //
- // To verify this behavior we have the following things happening:
- // 1. A defined query with an attached snapshot listener.
- // 2. Two different documents that we are making changes to throughout the duration of the
- // test. These changes are expected to trigger a number of events that should be delivered
- // to the snapshot listener from 1.
- // 3. Once all the document updates have been performed, and the expected number of events
- // has been delivered to the snapshot listener, the events will be queried for the expected
- // series of events and will then be asserted on.
- //
- // The mechanics of how the test is executed are as follows
- // A. All events delivered to the listener in 1 are added to a list of events we receive.
- // B. A separate ExecutorService is created and has tasks submitted to it to perform the
- // series of document updates mentioned in 2. (Each document gets its own task).
- // * Each of these update tasks will wait for the snapshot listener to be active before
- // performing the updates.
- // C. CountDownLatches are used to keep track of completion of work while the test is running.
- // * Every time an event is received by the listener in 1 a CDL is decremented.
- // * Each document update task has a CDL that is decremented when all actions have been
- // performed
- // * Each CDL has a timeout associated with its await (this is important so that in the
- // case there is a hang during the test the whole suite isn't taken with it)
- // D. After all updates have been performed and the expected number of events have been
- // received, assertions on the events will be performed.
- //
- // Note: There is a potential that this test fails if the Firestore backend chooses not to send
- // us individual changes for every document update, but rather merges two or more changes into
- // a single event. We have, however, not seen any failures during thousands of test runs.
- ListenerRegistration registration = null;
- final ExecutorService updatesExecutor = Executors.newCachedThreadPool();
-
- final List receivedEvents =
- Collections.synchronizedList(new ArrayList());
-
- try {
- // create our CDLs that are used to track work completion
- final CountDownLatch doc1CDL = new CountDownLatch(1);
- final CountDownLatch doc2CDL = new CountDownLatch(1);
- final CountDownLatch eventsCDL = new CountDownLatch(6);
-
- // create a CDL that is used to signal the snapshot lister is active.
- // if we don't do this, there is a possible race with out document updates happening before
- // the listener has received the "empty" event that is expected in the case that the
- // result set of the query would be empty (i.e. in the case of an empty collection with new
- // documents being created)
- final CountDownLatch snapshotListerActive = new CountDownLatch(1);
-
- final Query query = randomColl.whereEqualTo("foo", "bar");
- // register the snapshot listener for the query
- registration =
- query.addSnapshotListener(
- new EventListener() {
- @Override
- public void onEvent(
- @Nullable QuerySnapshot value, @Nullable FirestoreException error) {
- snapshotListerActive.countDown();
- receivedEvents.add(new ListenerEvent(value, error));
- eventsCDL.countDown();
- }
- });
-
- // Perform a series of operations on some documents in a separate thread
- // While we listen for the changes
- updatesExecutor.submit(
- new Runnable() {
- DocumentReference doc1 = randomColl.document("doc1");
-
- @Override
- public void run() {
- try {
- snapshotListerActive.await(5, TimeUnit.SECONDS);
- // create the first document
- doc1.set(map("baz", "foo")).get();
- // update the document
- doc1.set(map("foo", "bar")).get();
- // add a field to the document
- doc1.set(map("foo", "bar", "bar", "foo")).get();
- // delete the document
- doc1.delete().get();
- } catch (InterruptedException | ExecutionException e) {
- fail(String.format("Error while processing doc1: %s", e.getMessage()));
- } finally {
- doc1CDL.countDown();
- }
- }
- });
-
- updatesExecutor.submit(
- new Runnable() {
- DocumentReference doc2 = randomColl.document("doc2");
-
- @Override
- public void run() {
- try {
- snapshotListerActive.await(5, TimeUnit.SECONDS);
- // create a second document
- doc2.set(map("foo", "bar")).get();
- // update the document
- doc2.set(map("foo", "foo")).get();
- } catch (InterruptedException | ExecutionException e) {
- fail(String.format("Error while processing doc2: %s", e.getMessage()));
- } finally {
- doc2CDL.countDown();
- }
- }
- });
-
- // Wait for the document update operations to be performed
- final boolean doc1CDLAwait = doc1CDL.await(10, TimeUnit.SECONDS);
- assertTrue("all operations for doc1 were not completed in time", doc1CDLAwait);
- final boolean doc2CDLAwait = doc2CDL.await(10, TimeUnit.SECONDS);
- assertTrue("all operations for doc2 were not completed in time", doc2CDLAwait);
-
- // Wait for the expected number of update events to be delivered to our listener
- eventsCDL.await(30, TimeUnit.SECONDS);
- } finally {
- // cleanup out listener
- if (registration != null) {
- registration.remove();
- }
- // Shutdown the thread pool used to perform the document updates
- updatesExecutor.shutdown();
- }
-
- // Extract certain events from the list of events we received in out listener
-
- final FluentIterable events = FluentIterable.from(receivedEvents);
-
- final Optional anyError =
- events.firstMatch(
- new Predicate() {
- @Override
- public boolean apply(ListenerEvent input) {
- return input.error != null;
- }
- });
- assertWithMessage("snapshotListener received an error").that(anyError).isAbsent();
-
- final FluentIterable querySnapshots =
- events
- .filter(
- new Predicate() {
- @Override
- public boolean apply(ListenerEvent input) {
- return input.value != null;
- }
- })
- .transform(
- new com.google.common.base.Function() {
- @Override
- public QuerySnapshot apply(ListenerEvent input) {
- return input.value;
- }
- });
-
- final Optional initialEmpty =
- querySnapshots.firstMatch(
- new Predicate() {
- @Override
- public boolean apply(QuerySnapshot input) {
- return input.isEmpty() && input.getDocumentChanges().size() == 0;
- }
- });
- final Set addedDocumentIds = getIds(querySnapshots, DocumentChange.Type.ADDED);
- final Set modifiedDocumentIds = getIds(querySnapshots, DocumentChange.Type.MODIFIED);
- final Set removedDocumentIds = getIds(querySnapshots, DocumentChange.Type.REMOVED);
- final Optional finalRemove =
- querySnapshots.firstMatch(
- new Predicate() {
- @Override
- public boolean apply(QuerySnapshot input) {
- return input.isEmpty() && input.getDocumentChanges().size() == 1;
- }
- });
-
- assertWithMessage("snapshotListener did not receive expected initial empty event")
- .that(initialEmpty)
- .isPresent();
- assertWithMessage("snapshotListener did not receive expected added events")
- .that(addedDocumentIds)
- .isEqualTo(newHashSet("doc1", "doc2"));
- assertWithMessage("snapshotListener did not receive expected modified events")
- .that(modifiedDocumentIds)
- .isEqualTo(newHashSet("doc1"));
- assertWithMessage("snapshotListener did not receive expected removed events")
- .that(removedDocumentIds)
- .isEqualTo(newHashSet("doc1", "doc2"));
- assertWithMessage("snapshotListener did not receive expected final empty event")
- .that(finalRemove)
- .isPresent();
- }
-
private int paginateResults(Query query, List results)
throws ExecutionException, InterruptedException {
if (!results.isEmpty()) {
@@ -1340,35 +1132,6 @@ public void floatIncrement() throws ExecutionException, InterruptedException {
assertEquals(3.3, (Double) docSnap.get("sum"), DOUBLE_EPSILON);
}
- private static Set getIds(
- FluentIterable querySnapshots, DocumentChange.Type type) {
- final Set documentIds = new HashSet<>();
- for (QuerySnapshot querySnapshot : querySnapshots) {
- final List changes = querySnapshot.getDocumentChanges();
- for (DocumentChange change : changes) {
- if (change.getType() == type) {
- documentIds.add(change.getDocument().getId());
- }
- }
- }
- return documentIds;
- }
-
- /**
- * A tuple class used by {@code #queryWatch}. This class represents an event delivered to the
- * registered query listener.
- */
- private static final class ListenerEvent {
-
- @Nullable private final QuerySnapshot value;
- @Nullable private final FirestoreException error;
-
- ListenerEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) {
- this.value = value;
- this.error = error;
- }
- }
-
@Test
public void getAllWithObserver() throws Exception {
DocumentReference ref1 = randomColl.document("doc1");