From f701c67f9015ca96ca42c40f7e58d95dce70b18e Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 3 Mar 2020 15:27:56 -0500 Subject: [PATCH] fix: deflake-ify ITSystemTest#queryWatch (#107) queryWatch has been flaky because several things are happening in the single event stream. This fix splits up the different scenarios being tested into their own respective tests and updates the assertions to be more specific to the scenario as well as handling the semantics of the backend better. --- .../cloud/firestore/it/ITQueryWatchTest.java | 641 ++++++++++++++++++ .../cloud/firestore/it/ITSystemTest.java | 237 ------- 2 files changed, 641 insertions(+), 237 deletions(-) create mode 100644 google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java new file mode 100644 index 000000000..644488118 --- /dev/null +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java @@ -0,0 +1,641 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore.it; + +import static com.google.cloud.firestore.LocalFirestoreHelper.map; +import static com.google.common.collect.Sets.newHashSet; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static java.util.Collections.emptySet; + +import com.google.cloud.firestore.CollectionReference; +import com.google.cloud.firestore.DocumentChange; +import com.google.cloud.firestore.DocumentReference; +import com.google.cloud.firestore.EventListener; +import com.google.cloud.firestore.Firestore; +import com.google.cloud.firestore.FirestoreException; +import com.google.cloud.firestore.FirestoreOptions; +import com.google.cloud.firestore.ListenerRegistration; +import com.google.cloud.firestore.LocalFirestoreHelper; +import com.google.cloud.firestore.Query; +import com.google.cloud.firestore.QueryDocumentSnapshot; +import com.google.cloud.firestore.QuerySnapshot; +import com.google.cloud.firestore.it.ITQueryWatchTest.QuerySnapshotEventListener.ListenerAssertions; +import com.google.common.base.Joiner; +import com.google.common.base.Joiner.MapJoiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Range; +import com.google.common.truth.Truth; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public final class ITQueryWatchTest { + + private static Firestore firestore; + + @Rule public TestName testName = new TestName(); + + private CollectionReference randomColl; + + @BeforeClass + public static void beforeClass() { + FirestoreOptions firestoreOptions = FirestoreOptions.newBuilder().build(); + firestore = firestoreOptions.getService(); + } + + @Before + public void before() { + String autoId = LocalFirestoreHelper.autoId(); + String collPath = String.format("java-%s-%s", testName.getMethodName(), autoId); + randomColl = firestore.collection(collPath); + } + + @AfterClass + public static void afterClass() throws Exception { + firestore.close(); + } + + /** + * + * + *
    + *
  1. Attach a listener to a query with empty results. + *
  2. Verify the listener receives an empty event. + *
+ */ + @Test + public void emptyResults() throws InterruptedException { + final Query query = randomColl.whereEqualTo("foo", "bar"); + // register the snapshot listener for the query + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder().setInitialEventCount(1).build(); + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1)); + listenerAssertions.addedIdsIsAnyOf(emptySet()); + listenerAssertions.modifiedIdsIsAnyOf(emptySet()); + listenerAssertions.removedIdsIsAnyOf(emptySet()); + } + + /** + * + * + *
    + *
  1. Attach a listener to a query with non-empty results. + *
  2. Verify the listener receives an event including the expected document. + *
+ */ + @Test + public void nonEmptyResults() throws InterruptedException, TimeoutException, ExecutionException { + // create a document in our collection that will match the query + randomColl.document("doc").set(map("foo", "bar")).get(5, TimeUnit.SECONDS); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder().setInitialEventCount(1).build(); + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1)); + listenerAssertions.addedIdsIsAnyOf(newHashSet("doc")); + listenerAssertions.modifiedIdsIsAnyOf(emptySet()); + listenerAssertions.removedIdsIsAnyOf(emptySet()); + } + + /** + * + * + *
    + *
  1. Attach a listener to a query with empty results. + *
  2. Create a new document that matches the query. + *
  3. Verify newly created document results in an ADDED event. + *
+ */ + @Test + public void emptyResults_newDocument_ADDED() + throws InterruptedException, TimeoutException, ExecutionException { + + final Query query = randomColl.whereEqualTo("foo", "bar"); + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder().setInitialEventCount(1).setAddedEventCount(1).build(); + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + randomColl.document("doc").set(map("foo", "bar")).get(5, TimeUnit.SECONDS); + listener.eventsCountDownLatch.await(DocumentChange.Type.ADDED); + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); + listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc")); + listenerAssertions.modifiedIdsIsAnyOf(emptySet()); + listenerAssertions.removedIdsIsAnyOf(emptySet()); + } + + /** + * + * + *
    + *
  1. Attach a listener to a query with empty results. + *
  2. Modify an existing document so that it matches the query. + *
  3. Verify newly created document results in an ADDED event. + *
+ */ + @Test + public void emptyResults_modifiedDocument_ADDED() + throws InterruptedException, TimeoutException, ExecutionException { + // create our "existing non-matching document" + randomColl.document("doc").set(map("baz", "baz")).get(5, TimeUnit.SECONDS); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder().setInitialEventCount(1).setAddedEventCount(1).build(); + List receivedEvents = listener.receivedEvents; + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + randomColl.document("doc").update("foo", "bar").get(5, TimeUnit.SECONDS); + listener.eventsCountDownLatch.await(DocumentChange.Type.ADDED); + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); + listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc")); + listenerAssertions.modifiedIdsIsAnyOf(emptySet()); + listenerAssertions.removedIdsIsAnyOf(emptySet()); + + ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + //noinspection ConstantConditions guarded by "assertNoError" above + QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); + assertThat(doc.get("foo")).isEqualTo("bar"); + assertThat(doc.get("baz")).isEqualTo("baz"); + } + + /** + * + * + *
    + *
  1. Attach a listener to a query with non-empty results. + *
  2. Modify an existing document that is part of the results. + *
  3. Verify modified document results in a MODIFIED event. + *
+ */ + @Test + public void nonEmptyResults_modifiedDocument_MODIFIED() + throws InterruptedException, TimeoutException, ExecutionException { + DocumentReference testDoc = randomColl.document("doc"); + // create our "existing non-matching document" + testDoc.set(map("foo", "bar")).get(5, TimeUnit.SECONDS); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + // register the snapshot listener for the query + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder() + .setInitialEventCount(1) + .setModifiedEventCount(1) + .build(); + List receivedEvents = listener.receivedEvents; + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + testDoc.update("baz", "baz").get(5, TimeUnit.SECONDS); + listener.eventsCountDownLatch.await(DocumentChange.Type.MODIFIED); + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); + listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc")); + listenerAssertions.modifiedIdsIsAnyOf(emptySet(), newHashSet("doc")); + listenerAssertions.removedIdsIsAnyOf(emptySet()); + + ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + //noinspection ConstantConditions guarded by "assertNoError" above + QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); + assertThat(doc.get("foo")).isEqualTo("bar"); + assertThat(doc.get("baz")).isEqualTo("baz"); + } + + /** + * + * + *
    + *
  1. Attach a listener to a query with non-empty results. + *
  2. Delete an existing document that is part of the results. + *
  3. Verify deleted document results in a REMOVED event. + *
+ */ + @Test + public void nonEmptyResults_deletedDocument_REMOVED() + throws InterruptedException, TimeoutException, ExecutionException { + DocumentReference testDoc = randomColl.document("doc"); + // create our "existing non-matching document" + testDoc.set(map("foo", "bar")).get(5, TimeUnit.SECONDS); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + // register the snapshot listener for the query + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder() + .setInitialEventCount(1) + .setRemovedEventCount(1) + .build(); + List receivedEvents = listener.receivedEvents; + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + testDoc.delete().get(5, TimeUnit.SECONDS); + listener.eventsCountDownLatch.await(DocumentChange.Type.REMOVED); + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); + listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc")); + listenerAssertions.modifiedIdsIsAnyOf(emptySet()); + listenerAssertions.removedIdsIsAnyOf(emptySet(), newHashSet("doc")); + + ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + //noinspection ConstantConditions guarded by "assertNoError" above + QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); + assertThat(doc.get("foo")).isEqualTo("bar"); + } + + /** + * + * + *
    + *
  1. Attach a listener to a query with non-empty results. + *
  2. Modify an existing document that is part of the results to no longer match the query. + *
  3. Verify modified document results in a REMOVED event. + *
+ */ + @Test + public void nonEmptyResults_modifiedDocument_REMOVED() + throws InterruptedException, TimeoutException, ExecutionException { + DocumentReference testDoc = randomColl.document("doc"); + // create our "existing non-matching document" + testDoc.set(map("foo", "bar")).get(5, TimeUnit.SECONDS); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + // register the snapshot listener for the query + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder() + .setInitialEventCount(1) + .setRemovedEventCount(1) + .build(); + List receivedEvents = listener.receivedEvents; + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + testDoc.set(map("bar", "foo")).get(5, TimeUnit.SECONDS); + listener.eventsCountDownLatch.await(DocumentChange.Type.REMOVED); + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); + listenerAssertions.addedIdsIsAnyOf(emptySet(), newHashSet("doc")); + listenerAssertions.modifiedIdsIsAnyOf(emptySet()); + listenerAssertions.removedIdsIsAnyOf(emptySet(), newHashSet("doc")); + + ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + //noinspection ConstantConditions guarded by "assertNoError" above + QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); + assertThat(doc.get("foo")).isEqualTo("bar"); + } + + /** + * A tuple class used by {@code #queryWatch}. This class represents an event delivered to the + * registered query listener. + */ + private static final class ListenerEvent { + + @Nullable private final QuerySnapshot value; + @Nullable private final FirestoreException error; + + ListenerEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) { + this.value = value; + this.error = error; + } + } + + private static final class EventsCountDownLatch { + private final CountDownLatch initialEventsCountDownLatch; + private final int initialEventCount; + private final EnumMap eventsCounts; + private final EnumMap eventsCountDownLatches; + + EventsCountDownLatch( + int initialEventCount, + int addedInitialCount, + int modifiedInitialCount, + int removedInitialCount) { + initialEventsCountDownLatch = new CountDownLatch(initialEventCount); + this.initialEventCount = initialEventCount; + eventsCounts = new EnumMap<>(DocumentChange.Type.class); + eventsCounts.put(DocumentChange.Type.ADDED, addedInitialCount); + eventsCounts.put(DocumentChange.Type.MODIFIED, modifiedInitialCount); + eventsCounts.put(DocumentChange.Type.REMOVED, removedInitialCount); + eventsCountDownLatches = new EnumMap<>(DocumentChange.Type.class); + eventsCountDownLatches.put(DocumentChange.Type.ADDED, new CountDownLatch(addedInitialCount)); + eventsCountDownLatches.put( + DocumentChange.Type.MODIFIED, new CountDownLatch(modifiedInitialCount)); + eventsCountDownLatches.put( + DocumentChange.Type.REMOVED, new CountDownLatch(removedInitialCount)); + } + + void countDown() { + initialEventsCountDownLatch.countDown(); + } + + void countDown(DocumentChange.Type type) { + eventsCountDownLatches.get(type).countDown(); + } + + void awaitInitialEvents() throws InterruptedException { + initialEventsCountDownLatch.await(5 * initialEventCount, TimeUnit.SECONDS); + } + + void await(DocumentChange.Type type) throws InterruptedException { + int count = eventsCounts.get(type); + eventsCountDownLatches.get(type).await(5 * count, TimeUnit.SECONDS); + } + } + + static class QuerySnapshotEventListener implements EventListener { + final List receivedEvents; + final EventsCountDownLatch eventsCountDownLatch; + + private QuerySnapshotEventListener( + int initialCount, int addedEventCount, int modifiedEventCount, int removedEventCount) { + this.receivedEvents = Collections.synchronizedList(new ArrayList()); + this.eventsCountDownLatch = + new EventsCountDownLatch( + initialCount, addedEventCount, modifiedEventCount, removedEventCount); + } + + @Override + public void onEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) { + receivedEvents.add(new ListenerEvent(value, error)); + if (value != null) { + List documentChanges = value.getDocumentChanges(); + for (DocumentChange docChange : documentChanges) { + eventsCountDownLatch.countDown(docChange.getType()); + } + } + eventsCountDownLatch.countDown(); + } + + ListenerAssertions assertions() { + return new ListenerAssertions(receivedEvents); + } + + static Builder builder() { + return new Builder(); + } + + @SuppressWarnings("SameParameterValue") + static final class Builder { + private int initialEventCount = 0; + private int addedEventCount = 0; + private int modifiedEventCount = 0; + private int removedEventCount = 0; + + private Builder() {} + + Builder setInitialEventCount(int initialEventCount) { + this.initialEventCount = initialEventCount; + return this; + } + + Builder setAddedEventCount(int addedEventCount) { + this.addedEventCount = addedEventCount; + return this; + } + + Builder setModifiedEventCount(int modifiedEventCount) { + this.modifiedEventCount = modifiedEventCount; + return this; + } + + Builder setRemovedEventCount(int removedEventCount) { + this.removedEventCount = removedEventCount; + return this; + } + + public QuerySnapshotEventListener build() { + return new QuerySnapshotEventListener( + initialEventCount, addedEventCount, modifiedEventCount, removedEventCount); + } + } + + static final class ListenerAssertions { + private static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("="); + private final FluentIterable events; + private final Set addedIds; + private final Set modifiedIds; + private final Set removedIds; + + ListenerAssertions(List receivedEvents) { + events = FluentIterable.from(receivedEvents); + List querySnapshots = getQuerySnapshots(events); + addedIds = getIds(querySnapshots, DocumentChange.Type.ADDED); + modifiedIds = getIds(querySnapshots, DocumentChange.Type.MODIFIED); + removedIds = getIds(querySnapshots, DocumentChange.Type.REMOVED); + } + + private void noError() { + final Optional anyError = + events.firstMatch( + new Predicate() { + @Override + public boolean apply(ListenerEvent input) { + return input.error != null; + } + }); + assertWithMessage("snapshotListener received an error").that(anyError).isAbsent(); + } + + private static List getQuerySnapshots(FluentIterable events) { + return events + .filter( + new Predicate() { + @Override + public boolean apply(ListenerEvent input) { + return input.value != null; + } + }) + .transform( + new com.google.common.base.Function() { + @Override + public QuerySnapshot apply(ListenerEvent input) { + return input.value; + } + }) + .toList(); + } + + private static Set getIds( + List querySnapshots, DocumentChange.Type type) { + final Set documentIds = new HashSet<>(); + for (QuerySnapshot querySnapshot : querySnapshots) { + final List changes = querySnapshot.getDocumentChanges(); + for (DocumentChange change : changes) { + if (change.getType() == type) { + documentIds.add(change.getDocument().getId()); + } + } + } + return documentIds; + } + + void addedIdsIsAnyOf(Set s) { + Truth.assertWithMessage(debugMessage()).that(addedIds).isEqualTo(s); + } + + void addedIdsIsAnyOf(Set s1, Set s2) { + Truth.assertWithMessage(debugMessage()).that(addedIds).isAnyOf(s1, s2); + } + + void modifiedIdsIsAnyOf(Set s) { + Truth.assertWithMessage(debugMessage()).that(modifiedIds).isEqualTo(s); + } + + void modifiedIdsIsAnyOf(Set s1, Set s2) { + Truth.assertWithMessage(debugMessage()).that(modifiedIds).isAnyOf(s1, s2); + } + + void removedIdsIsAnyOf(Set s) { + Truth.assertWithMessage(debugMessage()).that(removedIds).isEqualTo(s); + } + + void removedIdsIsAnyOf(Set s1, Set s2) { + Truth.assertWithMessage(debugMessage()).that(removedIds).isAnyOf(s1, s2); + } + + void eventCountIsAnyOf(Range range) { + Truth.assertWithMessage(debugMessage()).that(events.size()).isIn(range); + } + + private String debugMessage() { + final StringBuilder builder = new StringBuilder(); + builder.append("events[\n"); + for (ListenerEvent receivedEvent : events) { + builder.append("event{"); + builder.append("error=").append(receivedEvent.error); + builder.append(","); + builder.append("value="); + debugMessage(builder, receivedEvent.value); + builder.append("},\n"); + } + builder.append("]"); + return builder.toString(); + } + + private static void debugMessage(StringBuilder builder, QuerySnapshot qs) { + if (qs == null) { + builder.append("null"); + } else { + builder.append("{"); + List documents = qs.getDocuments(); + builder.append("documents["); + for (QueryDocumentSnapshot document : documents) { + debugMessage(builder, document); + } + builder.append("],"); + List changes = qs.getDocumentChanges(); + builder.append("documentChanges["); + for (DocumentChange change : changes) { + debugMessage(builder, change.getDocument()); + } + builder.append("]"); + builder.append("}"); + } + } + + private static void debugMessage( + StringBuilder builder, QueryDocumentSnapshot queryDocumentSnapshot) { + if (queryDocumentSnapshot == null) { + builder.append("null"); + } else { + builder.append("{"); + builder.append("path=").append(queryDocumentSnapshot.getReference().getPath()); + builder.append(","); + builder.append("data="); + debugMessage(builder, queryDocumentSnapshot.getData()); + builder.append("}"); + } + } + + private static void debugMessage(StringBuilder builder, Map data) { + builder.append("{"); + MAP_JOINER.appendTo(builder, data); + builder.append("}"); + } + } + } +} diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 764337438..f5f9c1e24 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -17,8 +17,6 @@ package com.google.cloud.firestore.it; import static com.google.cloud.firestore.LocalFirestoreHelper.map; -import static com.google.common.collect.Sets.newHashSet; -import static com.google.common.truth.Truth.assertWithMessage; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -34,7 +32,6 @@ import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.Timestamp; import com.google.cloud.firestore.CollectionReference; -import com.google.cloud.firestore.DocumentChange; import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentSnapshot; import com.google.cloud.firestore.EventListener; @@ -57,25 +54,17 @@ import com.google.cloud.firestore.Transaction.Function; import com.google.cloud.firestore.WriteBatch; import com.google.cloud.firestore.WriteResult; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.junit.After; @@ -901,203 +890,6 @@ public void onEvent( } } - @Test - public void queryWatch() throws Exception { - // This test has quite a bit of machinery to it, as there are several things going on. - // The main scenario we are test is: 'Given a query, listen for snapshot updates made to that - // query' - // - // To verify this behavior we have the following things happening: - // 1. A defined query with an attached snapshot listener. - // 2. Two different documents that we are making changes to throughout the duration of the - // test. These changes are expected to trigger a number of events that should be delivered - // to the snapshot listener from 1. - // 3. Once all the document updates have been performed, and the expected number of events - // has been delivered to the snapshot listener, the events will be queried for the expected - // series of events and will then be asserted on. - // - // The mechanics of how the test is executed are as follows - // A. All events delivered to the listener in 1 are added to a list of events we receive. - // B. A separate ExecutorService is created and has tasks submitted to it to perform the - // series of document updates mentioned in 2. (Each document gets its own task). - // * Each of these update tasks will wait for the snapshot listener to be active before - // performing the updates. - // C. CountDownLatches are used to keep track of completion of work while the test is running. - // * Every time an event is received by the listener in 1 a CDL is decremented. - // * Each document update task has a CDL that is decremented when all actions have been - // performed - // * Each CDL has a timeout associated with its await (this is important so that in the - // case there is a hang during the test the whole suite isn't taken with it) - // D. After all updates have been performed and the expected number of events have been - // received, assertions on the events will be performed. - // - // Note: There is a potential that this test fails if the Firestore backend chooses not to send - // us individual changes for every document update, but rather merges two or more changes into - // a single event. We have, however, not seen any failures during thousands of test runs. - ListenerRegistration registration = null; - final ExecutorService updatesExecutor = Executors.newCachedThreadPool(); - - final List receivedEvents = - Collections.synchronizedList(new ArrayList()); - - try { - // create our CDLs that are used to track work completion - final CountDownLatch doc1CDL = new CountDownLatch(1); - final CountDownLatch doc2CDL = new CountDownLatch(1); - final CountDownLatch eventsCDL = new CountDownLatch(6); - - // create a CDL that is used to signal the snapshot lister is active. - // if we don't do this, there is a possible race with out document updates happening before - // the listener has received the "empty" event that is expected in the case that the - // result set of the query would be empty (i.e. in the case of an empty collection with new - // documents being created) - final CountDownLatch snapshotListerActive = new CountDownLatch(1); - - final Query query = randomColl.whereEqualTo("foo", "bar"); - // register the snapshot listener for the query - registration = - query.addSnapshotListener( - new EventListener() { - @Override - public void onEvent( - @Nullable QuerySnapshot value, @Nullable FirestoreException error) { - snapshotListerActive.countDown(); - receivedEvents.add(new ListenerEvent(value, error)); - eventsCDL.countDown(); - } - }); - - // Perform a series of operations on some documents in a separate thread - // While we listen for the changes - updatesExecutor.submit( - new Runnable() { - DocumentReference doc1 = randomColl.document("doc1"); - - @Override - public void run() { - try { - snapshotListerActive.await(5, TimeUnit.SECONDS); - // create the first document - doc1.set(map("baz", "foo")).get(); - // update the document - doc1.set(map("foo", "bar")).get(); - // add a field to the document - doc1.set(map("foo", "bar", "bar", "foo")).get(); - // delete the document - doc1.delete().get(); - } catch (InterruptedException | ExecutionException e) { - fail(String.format("Error while processing doc1: %s", e.getMessage())); - } finally { - doc1CDL.countDown(); - } - } - }); - - updatesExecutor.submit( - new Runnable() { - DocumentReference doc2 = randomColl.document("doc2"); - - @Override - public void run() { - try { - snapshotListerActive.await(5, TimeUnit.SECONDS); - // create a second document - doc2.set(map("foo", "bar")).get(); - // update the document - doc2.set(map("foo", "foo")).get(); - } catch (InterruptedException | ExecutionException e) { - fail(String.format("Error while processing doc2: %s", e.getMessage())); - } finally { - doc2CDL.countDown(); - } - } - }); - - // Wait for the document update operations to be performed - final boolean doc1CDLAwait = doc1CDL.await(10, TimeUnit.SECONDS); - assertTrue("all operations for doc1 were not completed in time", doc1CDLAwait); - final boolean doc2CDLAwait = doc2CDL.await(10, TimeUnit.SECONDS); - assertTrue("all operations for doc2 were not completed in time", doc2CDLAwait); - - // Wait for the expected number of update events to be delivered to our listener - eventsCDL.await(30, TimeUnit.SECONDS); - } finally { - // cleanup out listener - if (registration != null) { - registration.remove(); - } - // Shutdown the thread pool used to perform the document updates - updatesExecutor.shutdown(); - } - - // Extract certain events from the list of events we received in out listener - - final FluentIterable events = FluentIterable.from(receivedEvents); - - final Optional anyError = - events.firstMatch( - new Predicate() { - @Override - public boolean apply(ListenerEvent input) { - return input.error != null; - } - }); - assertWithMessage("snapshotListener received an error").that(anyError).isAbsent(); - - final FluentIterable querySnapshots = - events - .filter( - new Predicate() { - @Override - public boolean apply(ListenerEvent input) { - return input.value != null; - } - }) - .transform( - new com.google.common.base.Function() { - @Override - public QuerySnapshot apply(ListenerEvent input) { - return input.value; - } - }); - - final Optional initialEmpty = - querySnapshots.firstMatch( - new Predicate() { - @Override - public boolean apply(QuerySnapshot input) { - return input.isEmpty() && input.getDocumentChanges().size() == 0; - } - }); - final Set addedDocumentIds = getIds(querySnapshots, DocumentChange.Type.ADDED); - final Set modifiedDocumentIds = getIds(querySnapshots, DocumentChange.Type.MODIFIED); - final Set removedDocumentIds = getIds(querySnapshots, DocumentChange.Type.REMOVED); - final Optional finalRemove = - querySnapshots.firstMatch( - new Predicate() { - @Override - public boolean apply(QuerySnapshot input) { - return input.isEmpty() && input.getDocumentChanges().size() == 1; - } - }); - - assertWithMessage("snapshotListener did not receive expected initial empty event") - .that(initialEmpty) - .isPresent(); - assertWithMessage("snapshotListener did not receive expected added events") - .that(addedDocumentIds) - .isEqualTo(newHashSet("doc1", "doc2")); - assertWithMessage("snapshotListener did not receive expected modified events") - .that(modifiedDocumentIds) - .isEqualTo(newHashSet("doc1")); - assertWithMessage("snapshotListener did not receive expected removed events") - .that(removedDocumentIds) - .isEqualTo(newHashSet("doc1", "doc2")); - assertWithMessage("snapshotListener did not receive expected final empty event") - .that(finalRemove) - .isPresent(); - } - private int paginateResults(Query query, List results) throws ExecutionException, InterruptedException { if (!results.isEmpty()) { @@ -1340,35 +1132,6 @@ public void floatIncrement() throws ExecutionException, InterruptedException { assertEquals(3.3, (Double) docSnap.get("sum"), DOUBLE_EPSILON); } - private static Set getIds( - FluentIterable querySnapshots, DocumentChange.Type type) { - final Set documentIds = new HashSet<>(); - for (QuerySnapshot querySnapshot : querySnapshots) { - final List changes = querySnapshot.getDocumentChanges(); - for (DocumentChange change : changes) { - if (change.getType() == type) { - documentIds.add(change.getDocument().getId()); - } - } - } - return documentIds; - } - - /** - * A tuple class used by {@code #queryWatch}. This class represents an event delivered to the - * registered query listener. - */ - private static final class ListenerEvent { - - @Nullable private final QuerySnapshot value; - @Nullable private final FirestoreException error; - - ListenerEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) { - this.value = value; - this.error = error; - } - } - @Test public void getAllWithObserver() throws Exception { DocumentReference ref1 = randomColl.document("doc1");