diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml
index 4a4d5d4c0..d32596ef0 100644
--- a/google-cloud-firestore/clirr-ignored-differences.xml
+++ b/google-cloud-firestore/clirr-ignored-differences.xml
@@ -252,4 +252,10 @@
*
+
+
+ 7012
+ com/google/cloud/firestore/Firestore
+ com.google.api.core.ApiFuture recursiveDelete(*)
+
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
index bb9d75090..b927dd6a5 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
@@ -805,7 +805,17 @@ public void close() throws InterruptedException, ExecutionException {
flushFuture.get();
}
- private void verifyNotClosedLocked() {
+ /**
+ * Used for verifying that the BulkWriter instance isn't closed when calling from outside this
+ * class.
+ */
+ void verifyNotClosed() {
+ synchronized (lock) {
+ verifyNotClosedLocked();
+ }
+ }
+
+ void verifyNotClosedLocked() {
if (this.closed) {
throw new IllegalStateException("BulkWriter has already been closed.");
}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
index a1c3a5f29..2cd897b49 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
@@ -193,6 +193,84 @@ void getAll(
@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options);
+ /**
+ * Recursively deletes all documents and subcollections at and under the specified level.
+ *
+ *
If any delete fails, the ApiFuture contains an error with an error message containing the
+ * number of failed deletes and the stack trace of the last failed delete. The provided reference
+ * is deleted regardless of whether all deletes succeeded.
+ *
+ *
recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the
+ * deletes. To customize throttling rates or add success/error callbacks, pass in a custom
+ * BulkWriter instance.
+ *
+ * @param reference The reference of the collection to delete.
+ * @return An ApiFuture that completes when all deletes have been performed. The future fails with
+ * an error if any of the deletes fail.
+ */
+ @BetaApi
+ @Nonnull
+ ApiFuture recursiveDelete(CollectionReference reference);
+
+ /**
+ * Recursively deletes all documents and subcollections at and under the specified level.
+ *
+ * If any delete fails, the ApiFuture contains an error with an error message containing the
+ * number of failed deletes and the stack trace of the last failed delete. The provided reference
+ * is deleted regardless of whether all deletes succeeded.
+ *
+ *
recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the
+ * deletes. To customize throttling rates or add success/error callbacks, pass in a custom
+ * BulkWriter instance.
+ *
+ * @param reference The reference of the collection to delete.
+ * @param bulkWriter A custom BulkWriter instance used to perform the deletes.
+ * @return An ApiFuture that completes when all deletes have been performed. The future fails with
+ * an error if any of the deletes fail.
+ */
+ @BetaApi
+ @Nonnull
+ ApiFuture recursiveDelete(CollectionReference reference, BulkWriter bulkWriter);
+
+ /**
+ * Recursively deletes all documents and subcollections at and under the specified level.
+ *
+ * If any delete fails, the ApiFuture contains an error with an error message containing the
+ * number of failed deletes and the stack trace of the last failed delete. The provided reference
+ * is deleted regardless of whether all deletes succeeded.
+ *
+ *
recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the
+ * deletes. To customize throttling rates or add success/error callbacks, pass in a custom
+ * BulkWriter instance.
+ *
+ * @param reference The reference of the document to delete.
+ * @return An ApiFuture that completes when all deletes have been performed. The future fails with
+ * an error if any of the deletes fail.
+ */
+ @BetaApi
+ @Nonnull
+ ApiFuture recursiveDelete(DocumentReference reference);
+
+ /**
+ * Recursively deletes all documents and subcollections at and under the specified level.
+ *
+ * If any delete fails, the ApiFuture contains an error with an error message containing the
+ * number of failed deletes and the stack trace of the last failed delete. The provided reference
+ * is deleted regardless of whether all deletes succeeded.
+ *
+ *
recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the
+ * deletes. To customize throttling rates or add success/error callbacks, pass in a custom
+ * BulkWriter instance.
+ *
+ * @param reference The reference of the document to delete.
+ * @param bulkWriter A custom BulkWriter instance used to perform the deletes.
+ * @return An ApiFuture that completes when all deletes have been performed. The future fails with
+ * an error if any of the deletes fail.
+ */
+ @BetaApi
+ @Nonnull
+ ApiFuture recursiveDelete(DocumentReference reference, BulkWriter bulkWriter);
+
/**
* Returns a FirestoreBundle.Builder {@link FirestoreBundle.Builder} instance using an
* automatically generated bundle ID. When loaded on clients, client SDKs use the bundle ID and
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java
index f92b7e3d6..7a93e711d 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java
@@ -24,6 +24,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1.BatchGetDocumentsRequest;
@@ -59,6 +60,12 @@ class FirestoreImpl implements Firestore, FirestoreRpcContext {
private final FirestoreOptions firestoreOptions;
private final ResourcePath databasePath;
+ /**
+ * A lazy-loaded BulkWriter instance to be used with recursiveDelete() if no BulkWriter instance
+ * is provided.
+ */
+ @Nullable private BulkWriter bulkWriterInstance;
+
private boolean closed;
FirestoreImpl(FirestoreOptions options) {
@@ -76,6 +83,14 @@ class FirestoreImpl implements Firestore, FirestoreRpcContext {
ResourcePath.create(DatabaseRootName.of(options.getProjectId(), options.getDatabaseId()));
}
+ /** Lazy-load the Firestore's default BulkWriter. */
+ private BulkWriter getBulkWriter() {
+ if (bulkWriterInstance == null) {
+ bulkWriterInstance = bulkWriter();
+ }
+ return bulkWriterInstance;
+ }
+
/** Creates a pseudo-random 20-character ID that can be used for Firestore documents. */
static String autoId() {
StringBuilder builder = new StringBuilder();
@@ -102,6 +117,47 @@ public BulkWriter bulkWriter(BulkWriterOptions options) {
return new BulkWriter(this, options);
}
+ @Nonnull
+ public ApiFuture recursiveDelete(CollectionReference reference) {
+ BulkWriter writer = getBulkWriter();
+ return recursiveDelete(reference.getResourcePath(), writer);
+ }
+
+ @Nonnull
+ public ApiFuture recursiveDelete(CollectionReference reference, BulkWriter bulkWriter) {
+ return recursiveDelete(reference.getResourcePath(), bulkWriter);
+ }
+
+ @Nonnull
+ public ApiFuture recursiveDelete(DocumentReference reference) {
+ BulkWriter writer = getBulkWriter();
+ return recursiveDelete(reference.getResourcePath(), writer);
+ }
+
+ @Nonnull
+ public ApiFuture recursiveDelete(
+ DocumentReference reference, @Nonnull BulkWriter bulkWriter) {
+ return recursiveDelete(reference.getResourcePath(), bulkWriter);
+ }
+
+ @Nonnull
+ public ApiFuture recursiveDelete(ResourcePath path, BulkWriter bulkWriter) {
+ return recursiveDelete(
+ path, bulkWriter, RecursiveDelete.MAX_PENDING_OPS, RecursiveDelete.MIN_PENDING_OPS);
+ }
+
+ /**
+ * This overload is not private in order to test the query resumption with startAfter() once the
+ * RecursiveDelete instance has MAX_PENDING_OPS pending.
+ */
+ @Nonnull
+ @VisibleForTesting
+ ApiFuture recursiveDelete(
+ ResourcePath path, @Nonnull BulkWriter bulkWriter, int maxLimit, int minLimit) {
+ RecursiveDelete deleter = new RecursiveDelete(this, bulkWriter, path, maxLimit, minLimit);
+ return deleter.run();
+ }
+
@Nonnull
@Override
public CollectionReference collection(@Nonnull String collectionPath) {
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java
index cc6c2ab34..b1f5fa0bf 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java
@@ -255,13 +255,24 @@ abstract static class QueryOptions {
abstract ImmutableList getFieldProjections();
+ // Whether to select all documents under `parentPath`. By default, only
+ // collections that match `collectionId` are selected.
+ abstract boolean isKindless();
+
+ // Whether to require consistent documents when restarting the query. By
+ // default, restarting the query uses the readTime offset of the original
+ // query to provide consistent results.
+ abstract boolean getRequireConsistency();
+
static Builder builder() {
return new AutoValue_Query_QueryOptions.Builder()
.setAllDescendants(false)
.setLimitType(LimitType.First)
.setFieldOrders(ImmutableList.of())
.setFieldFilters(ImmutableList.of())
- .setFieldProjections(ImmutableList.of());
+ .setFieldProjections(ImmutableList.of())
+ .setKindless(false)
+ .setRequireConsistency(true);
}
abstract Builder toBuilder();
@@ -290,6 +301,10 @@ abstract static class Builder {
abstract Builder setFieldProjections(ImmutableList value);
+ abstract Builder setKindless(boolean value);
+
+ abstract Builder setRequireConsistency(boolean value);
+
abstract QueryOptions build();
}
}
@@ -327,21 +342,21 @@ private static boolean isUnaryComparison(@Nullable Object value) {
/** Computes the backend ordering semantics for DocumentSnapshot cursors. */
private ImmutableList createImplicitOrderBy() {
List implicitOrders = new ArrayList<>(options.getFieldOrders());
- boolean hasDocumentId = false;
+ // If no explicit ordering is specified, use the first inequality to define an implicit order.
if (implicitOrders.isEmpty()) {
- // If no explicit ordering is specified, use the first inequality to define an implicit order.
for (FieldFilter fieldFilter : options.getFieldFilters()) {
if (fieldFilter.isInequalityFilter()) {
implicitOrders.add(new FieldOrder(fieldFilter.fieldReference, Direction.ASCENDING));
break;
}
}
- } else {
- for (FieldOrder fieldOrder : options.getFieldOrders()) {
- if (FieldPath.isDocumentId(fieldOrder.fieldReference.getFieldPath())) {
- hasDocumentId = true;
- }
+ }
+
+ boolean hasDocumentId = false;
+ for (FieldOrder fieldOrder : implicitOrders) {
+ if (FieldPath.isDocumentId(fieldOrder.fieldReference.getFieldPath())) {
+ hasDocumentId = true;
}
}
@@ -1237,7 +1252,12 @@ BundledQuery toBundledQuery() {
private StructuredQuery.Builder buildWithoutClientTranslation() {
StructuredQuery.Builder structuredQuery = StructuredQuery.newBuilder();
CollectionSelector.Builder collectionSelector = CollectionSelector.newBuilder();
- collectionSelector.setCollectionId(options.getCollectionId());
+
+ // Kindless queries select all descendant documents, so we don't add the collectionId field.
+ if (!options.isKindless()) {
+ collectionSelector.setCollectionId(options.getCollectionId());
+ }
+
collectionSelector.setAllDescendants(options.getAllDescendants());
structuredQuery.addFrom(collectionSelector);
@@ -1525,10 +1545,17 @@ public void onError(Throwable throwable) {
// since we are requiring at least a single document result.
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
if (cursor != null) {
- Query.this
- .startAfter(cursor)
- .internalStream(
- documentObserver, /* transactionId= */ null, cursor.getReadTime());
+ if (options.getRequireConsistency()) {
+ Query.this
+ .startAfter(cursor)
+ .internalStream(
+ documentObserver, /* transactionId= */ null, cursor.getReadTime());
+ } else {
+ Query.this
+ .startAfter(cursor)
+ .internalStream(
+ documentObserver, /* transactionId= */ null, /* readTime= */ null);
+ }
}
} else {
Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error");
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java
new file mode 100644
index 000000000..b14eb1983
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import com.google.api.core.ApiAsyncFunction;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.BetaApi;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ApiStreamObserver;
+import com.google.cloud.firestore.Query.QueryOptions;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Status;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * Class used to store state required for running a recursive delete operation. Each recursive
+ * delete call should use a new instance of the class.
+ */
+@BetaApi
+public final class RecursiveDelete {
+ /**
+ * Datastore allowed numeric IDs where Firestore only allows strings. Numeric IDs are exposed to
+ * Firestore as __idNUM__, so this is the lowest possible negative numeric value expressed in that
+ * format.
+ *
+ * This constant is used to specify startAt/endAt values when querying for all descendants in a
+ * single collection.
+ */
+ public static final String REFERENCE_NAME_MIN_ID = "__id-9223372036854775808__";
+
+ /**
+ * The query limit used for recursive deletes when fetching all descendants of the specified
+ * reference to delete. This is done to prevent the query stream from streaming documents faster
+ * than Firestore can delete.
+ */
+ public static final int MAX_PENDING_OPS = 5000;
+
+ /**
+ * The number of pending BulkWriter operations at which RecursiveDelete starts the next limit
+ * query to fetch descendants. By starting the query while there are pending operations, Firestore
+ * can improve BulkWriter throughput. This helps prevent BulkWriter from idling while Firestore
+ * fetches the next query.
+ */
+ public static final int MIN_PENDING_OPS = 1000;
+
+ private final FirestoreRpcContext> firestoreRpcContext;
+ private final BulkWriter writer;
+
+ /** The resource path of the reference to recursively delete */
+ private final ResourcePath path;
+
+ /** Lock object for all BulkWriter operations and callbacks. */
+ private final Object lock = new Object();
+
+ /** The number of deletes that failed with a permanent error. */
+ @GuardedBy("lock")
+ private int errorCount = 0;
+
+ /**
+ * The most recently thrown error. Used to populate the developer-facing error message when the
+ * recursive delete operation completes.
+ */
+ @GuardedBy("lock")
+ @Nullable
+ private Throwable lastError;
+
+ /** Whether there are still documents to delete that still need to be fetched. */
+ @GuardedBy("lock")
+ private boolean documentsPending = true;
+
+ /** A deferred promise that resolves when the recursive delete operation is completed. */
+ private final SettableApiFuture completionFuture = SettableApiFuture.create();
+
+ /** Whether a query stream is currently in progress. Only one stream can be run at a time. */
+ @GuardedBy("lock")
+ private boolean streamInProgress = false;
+
+ /** Whether run() has been called. */
+ private boolean started = false;
+
+ /** Query limit to use when fetching all descendants. */
+ private final int maxPendingOps;
+
+ /**
+ * The number of pending BulkWriter operations at which RecursiveDelete starts the next limit
+ * query to fetch descendants.
+ */
+ private final int minPendingOps;
+
+ /**
+ * The last document snapshot returned by the stream. Used to set the startAfter() field in the
+ * subsequent stream.
+ */
+ @Nullable private DocumentSnapshot lastDocumentSnap;
+
+ /**
+ * The number of pending BulkWriter operations. Used to determine when the next query can be run.
+ */
+ private int pendingOperationsCount = 0;
+
+ RecursiveDelete(
+ FirestoreRpcContext> firestoreRpcContext,
+ BulkWriter writer,
+ ResourcePath path,
+ int maxLimit,
+ int minLimit) {
+ this.firestoreRpcContext = firestoreRpcContext;
+ this.writer = writer;
+ this.path = path;
+ this.maxPendingOps = maxLimit;
+ this.minPendingOps = minLimit;
+ }
+
+ public ApiFuture run() {
+ Preconditions.checkState(!started, "RecursiveDelete.run() should only be called once");
+ started = true;
+
+ writer.verifyNotClosed();
+
+ streamDescendants();
+ return completionFuture;
+ }
+
+ private void streamDescendants() {
+ Query query = getAllDescendantsQuery();
+ synchronized (lock) {
+ streamInProgress = true;
+ }
+ final int[] streamedDocsCount = {0};
+ final ApiStreamObserver responseObserver =
+ new ApiStreamObserver() {
+ public void onNext(DocumentSnapshot snapshot) {
+ streamedDocsCount[0]++;
+ lastDocumentSnap = snapshot;
+ deleteReference(snapshot.getReference());
+ }
+
+ public void onError(Throwable throwable) {
+ String message = "Failed to fetch children documents";
+ synchronized (lock) {
+ lastError =
+ FirestoreException.forServerRejection(Status.UNAVAILABLE, throwable, message);
+ }
+ onQueryEnd();
+ }
+
+ public void onCompleted() {
+ synchronized (lock) {
+ streamInProgress = false;
+ }
+ // If there are fewer than the number of documents specified in the limit() field, we
+ // know that the query is complete.
+ if (streamedDocsCount[0] < maxPendingOps) {
+ onQueryEnd();
+ } else if (pendingOperationsCount == 0) {
+ // Start a new stream if all documents from this stream were deleted before the
+ // `onCompleted()` handler was called.
+ streamDescendants();
+ }
+ }
+ };
+
+ query.stream(responseObserver);
+ }
+
+ private Query getAllDescendantsQuery() {
+ ResourcePath parentPath;
+ String collectionId;
+ if (path.isDocument()) {
+ // The parent is the closest ancestor document to the location we're deleting. Since we are
+ // deleting a document, the parent is the path of that document.
+ parentPath = path;
+ Preconditions.checkState(
+ path.getParent() != null, "Parent of a document should not be null.");
+ collectionId = path.getParent().getId();
+ } else {
+ // The parent is the closest ancestor document to the location we're deleting. Since we are
+ // deleting a collection, the parent is the path of the document containing that collection
+ // (or the database root, if it is a root collection).
+ parentPath = path.popLast();
+ collectionId = path.getId();
+ }
+
+ Query query =
+ new Query(
+ firestoreRpcContext,
+ QueryOptions.builder()
+ .setParentPath(parentPath)
+ .setCollectionId(collectionId)
+ .setAllDescendants(true)
+ .setKindless(true)
+ .setRequireConsistency(false)
+ .build());
+
+ // Query for names only to fetch empty snapshots.
+ query = query.select(FieldPath.documentId()).limit(maxPendingOps);
+
+ // To find all descendants of a collection reference, we need to use a
+ // composite filter that captures all documents that start with the
+ // collection prefix. The MIN_ID constant represents the minimum key in
+ // this collection, and a null byte + the MIN_ID represents the minimum
+ // key is the next possible collection.
+ if (path.isCollection()) {
+ char nullChar = '\0';
+ String startAt = collectionId + "/" + REFERENCE_NAME_MIN_ID;
+ String endAt = collectionId + nullChar + "/" + REFERENCE_NAME_MIN_ID;
+ query =
+ query
+ .whereGreaterThanOrEqualTo(FieldPath.documentId(), startAt)
+ .whereLessThan(FieldPath.documentId(), endAt);
+ }
+
+ // startAfter() needs to be added after the where() filters since it creates an implicit
+ // orderBy.
+ if (lastDocumentSnap != null) {
+ query = query.startAfter(lastDocumentSnap);
+ }
+
+ return query;
+ }
+
+ /**
+ * Called when all descendants of the provided reference have been streamed or if a permanent
+ * error occurs during the stream. Deletes the developer provided reference and wraps any errors
+ * that occurred.
+ */
+ private void onQueryEnd() {
+ synchronized (lock) {
+ documentsPending = false;
+ }
+
+ // Used to aggregate flushFuture and deleteFuture to use with ApiFutures.allAsList(), in order
+ // to ensure that the delete catchingAsync() callback is run before the flushFuture callback.
+ List> pendingFutures = new ArrayList<>();
+
+ // Delete the provided document reference if one was provided.
+ if (path.isDocument()) {
+ pendingFutures.add(deleteReference(new DocumentReference(firestoreRpcContext, path)));
+ }
+
+ pendingFutures.add(writer.flush());
+
+ // Completes the future returned by run() and sets the exception if an error occurred.
+ ApiFutures.transformAsync(
+ ApiFutures.allAsList(pendingFutures),
+ new ApiAsyncFunction, Void>() {
+ public ApiFuture apply(List unused) {
+ synchronized (lock) {
+ if (lastError == null) {
+ completionFuture.set(null);
+ } else {
+ String message =
+ errorCount
+ + (errorCount != 1 ? " deletes" : " delete")
+ + " failed. "
+ + lastError.getMessage();
+ if (lastError instanceof FirestoreException) {
+ lastError =
+ new FirestoreException(message, ((FirestoreException) lastError).getStatus());
+ } else {
+ lastError = new Throwable(message, lastError);
+ }
+ completionFuture.setException(lastError);
+ }
+ return ApiFutures.immediateFuture(null);
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ /** Deletes the provided reference and starts the next stream if conditions are met. */
+ private ApiFuture deleteReference(final DocumentReference reference) {
+ synchronized (lock) {
+ pendingOperationsCount++;
+ }
+
+ ApiFuture catchingDeleteFuture =
+ ApiFutures.catchingAsync(
+ writer.delete(reference),
+ Throwable.class,
+ new ApiAsyncFunction() {
+ public ApiFuture apply(Throwable e) {
+ synchronized (lock) {
+ errorCount++;
+ lastError = e;
+ return ApiFutures.immediateFuture(null);
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+
+ return ApiFutures.transformAsync(
+ catchingDeleteFuture,
+ new ApiAsyncFunction() {
+
+ public ApiFuture apply(WriteResult result) {
+ synchronized (lock) {
+ pendingOperationsCount--;
+ // We wait until the previous stream has ended in order to ensure the
+ // startAfter document is correct. Starting the next stream while
+ // there are pending operations allows Firestore to maximize
+ // BulkWriter throughput.
+ if (documentsPending && !streamInProgress && pendingOperationsCount < minPendingOps) {
+ streamDescendants();
+ }
+ return ApiFutures.immediateFuture(null);
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java
index 7f305475f..ee44cf4f3 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java
@@ -157,6 +157,18 @@ public int compareTo(@Nonnull ResourcePath other) {
return super.compareTo(other);
}
+ /**
+ * Pops the last segment from this `ResourcePath` and returns a newly constructed `ResourcePath`
+ * without the last segment. This does not change the ResourcePath, since `ResourcePath` is
+ * immutable.
+ *
+ * @return The newly created Path.
+ */
+ ResourcePath popLast() {
+ ImmutableList segments = getSegments();
+ return createPathWithSegments(segments.subList(0, segments.size() - 1));
+ }
+
@Override
public String toString() {
return getName();
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
index daf90171c..5df76ff61 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
@@ -77,7 +77,7 @@
@RunWith(MockitoJUnitRunner.class)
public class BulkWriterTest {
- private static final ApiFuture FAILED_FUTURE =
+ public static final ApiFuture FAILED_FUTURE =
ApiFutures.immediateFailedFuture(
new ApiException(
new IllegalStateException("Mock batchWrite failed in test"),
@@ -125,14 +125,14 @@ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit)
private DocumentReference doc1;
private DocumentReference doc2;
- private ApiFuture successResponse(int updateTimeSeconds) {
+ public static ApiFuture successResponse(int updateTimeSeconds) {
BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder();
response.addWriteResultsBuilder().getUpdateTimeBuilder().setSeconds(updateTimeSeconds).build();
response.addStatusBuilder().build();
return ApiFutures.immediateFuture(response.build());
}
- private ApiFuture failedResponse(int code) {
+ public static ApiFuture failedResponse(int code) {
BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder();
response.addWriteResultsBuilder().build();
response.addStatusBuilder().setCode(code).build();
@@ -143,8 +143,8 @@ private ApiFuture failedResponse() {
return failedResponse(Code.DEADLINE_EXCEEDED_VALUE);
}
- private ApiFuture mergeResponses(ApiFuture... responses)
- throws Exception {
+ public static ApiFuture mergeResponses(
+ ApiFuture... responses) throws Exception {
BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder();
for (ApiFuture future : responses) {
BatchWriteResponse res = future.get();
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java
index 2ac9c033b..d7b03b7ee 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java
@@ -282,6 +282,11 @@ public static ApiFuture rollbackResponse() {
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
+ public static Answer emptyQueryResponse() {
+ RunQueryResponse[] response = new RunQueryResponse[0];
+ return streamingResponse(response, null);
+ }
+
public static Answer queryResponse() {
return queryResponse(DOCUMENT_NAME);
}
@@ -573,16 +578,36 @@ public static RunQueryRequest query(StructuredQuery... query) {
public static RunQueryRequest query(
@Nullable String transactionId, boolean allDescendants, StructuredQuery... query) {
+ return query(transactionId, /* parent= */ "", allDescendants, /* kindless= */ false, query);
+ }
+
+ public static RunQueryRequest query(
+ @Nullable String transactionId,
+ boolean allDescendants,
+ boolean kindless,
+ StructuredQuery... query) {
+ return query(transactionId, /* parent= */ "", allDescendants, kindless, query);
+ }
+
+ public static RunQueryRequest query(
+ @Nullable String transactionId,
+ String parent,
+ boolean allDescendants,
+ boolean kindless,
+ StructuredQuery... query) {
RunQueryRequest.Builder request = RunQueryRequest.newBuilder();
- request.setParent(LocalFirestoreHelper.DATABASE_NAME + "/documents");
+ if (!parent.equals("")) {
+ parent = '/' + parent;
+ }
+ request.setParent(LocalFirestoreHelper.DATABASE_NAME + "/documents" + parent);
StructuredQuery.Builder structuredQuery = request.getStructuredQueryBuilder();
- CollectionSelector collectionSelector =
- CollectionSelector.newBuilder()
- .setCollectionId("coll")
- .setAllDescendants(allDescendants)
- .build();
- structuredQuery.addFrom(collectionSelector);
+ CollectionSelector.Builder builder =
+ CollectionSelector.newBuilder().setAllDescendants(allDescendants);
+ if (!kindless) {
+ builder.setCollectionId("coll");
+ }
+ structuredQuery.addFrom(builder);
for (StructuredQuery option : query) {
structuredQuery.mergeFrom(option);
@@ -630,6 +655,10 @@ public static BatchGetDocumentsRequest getAll(
return request.build();
}
+ public static StructuredQuery order(FieldPath fieldPath, StructuredQuery.Direction direction) {
+ return order(fieldPath.getEncodedPath(), direction);
+ }
+
public static StructuredQuery order(String fieldPath, StructuredQuery.Direction direction) {
StructuredQuery.Builder structuredQuery = StructuredQuery.newBuilder();
structuredQuery.addOrderByBuilder().setField(field(fieldPath)).setDirection(direction);
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java
new file mode 100644
index 000000000..a83842779
--- /dev/null
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java
@@ -0,0 +1,736 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import static com.google.cloud.firestore.BulkWriterTest.failedResponse;
+import static com.google.cloud.firestore.BulkWriterTest.mergeResponses;
+import static com.google.cloud.firestore.BulkWriterTest.successResponse;
+import static com.google.cloud.firestore.LocalFirestoreHelper.COLLECTION_ID;
+import static com.google.cloud.firestore.LocalFirestoreHelper.DATABASE_NAME;
+import static com.google.cloud.firestore.LocalFirestoreHelper.DOCUMENT_ROOT;
+import static com.google.cloud.firestore.LocalFirestoreHelper.batchWrite;
+import static com.google.cloud.firestore.LocalFirestoreHelper.delete;
+import static com.google.cloud.firestore.LocalFirestoreHelper.emptyQueryResponse;
+import static com.google.cloud.firestore.LocalFirestoreHelper.filter;
+import static com.google.cloud.firestore.LocalFirestoreHelper.limit;
+import static com.google.cloud.firestore.LocalFirestoreHelper.order;
+import static com.google.cloud.firestore.LocalFirestoreHelper.query;
+import static com.google.cloud.firestore.LocalFirestoreHelper.queryResponse;
+import static com.google.cloud.firestore.LocalFirestoreHelper.reference;
+import static com.google.cloud.firestore.LocalFirestoreHelper.select;
+import static com.google.cloud.firestore.LocalFirestoreHelper.startAt;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+
+import com.google.api.core.ApiAsyncFunction;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ApiStreamObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.BulkWriter.WriteErrorCallback;
+import com.google.cloud.firestore.BulkWriter.WriteResultCallback;
+import com.google.cloud.firestore.LocalFirestoreHelper.ResponseStubber;
+import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Write;
+import com.google.rpc.Code;
+import io.grpc.Status;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RecursiveDeleteTest {
+ @Rule public Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
+
+ @Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class);
+
+ /** Executor that executes delayed tasks without delay. */
+ private final ScheduledExecutorService immediateExecutor =
+ new ScheduledThreadPoolExecutor(1) {
+ @Override
+ @Nonnull
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ return super.schedule(command, 0, TimeUnit.MILLISECONDS);
+ }
+ };
+
+ @Spy
+ private final FirestoreImpl firestoreMock =
+ new FirestoreImpl(
+ FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc);
+
+ @Captor private ArgumentCaptor batchWriteCapture;
+ @Captor private ArgumentCaptor runQueryCapture;
+ @Captor private ArgumentCaptor streamObserverCapture;
+
+ private BulkWriter bulkWriter;
+ private ResponseStubber responseStubber;
+
+ @Before
+ public void before() {
+ doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
+
+ final ScheduledExecutorService timeoutExecutor =
+ new ScheduledThreadPoolExecutor(1) {
+ @Override
+ @Nonnull
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ return super.schedule(command, 0, TimeUnit.MILLISECONDS);
+ }
+ };
+ }
+
+ private Value recursiveDeleteStartAt(String name) {
+ return Value.newBuilder()
+ .setReferenceValue(
+ DATABASE_NAME + "/documents/" + name + "/" + RecursiveDelete.REFERENCE_NAME_MIN_ID)
+ .build();
+ }
+
+ private Value recursiveDeleteEndAt(String name) {
+ return Value.newBuilder()
+ .setReferenceValue(
+ DOCUMENT_ROOT + name + '\0' + "/" + RecursiveDelete.REFERENCE_NAME_MIN_ID)
+ .build();
+ }
+
+ private String fullDocumentPath(String name) {
+ return DOCUMENT_ROOT + COLLECTION_ID + "/" + name;
+ }
+
+ private void setupMocks(List childrenDocuments) throws Exception {
+ setupMocks(childrenDocuments, /* deleteDocRef= */ "");
+ }
+
+ private void setupMocks(List childrenDocuments, String deleteDocRef) throws Exception {
+ setupMocks(childrenDocuments, deleteDocRef, null);
+ }
+
+ private void setupMocks(
+ List childrenDocuments,
+ final String deleteDocRef,
+ @Nullable final ApiFuture customResponse)
+ throws Exception {
+
+ List fullDocumentPaths = new ArrayList<>();
+ for (String documentName : childrenDocuments) {
+ fullDocumentPaths.add(fullDocumentPath(documentName));
+ }
+
+ doAnswer(queryResponse(fullDocumentPaths.toArray(new String[0])))
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ if (!deleteDocRef.equals("")) {
+ childrenDocuments = new ArrayList<>(childrenDocuments);
+ childrenDocuments.add(deleteDocRef);
+ }
+
+ final List expectedRequests = new ArrayList<>();
+ final List> returnedResponse = new ArrayList<>();
+ for (String documentPath : childrenDocuments) {
+ expectedRequests.add(delete(COLLECTION_ID + "/" + documentPath));
+ returnedResponse.add(successResponse(1));
+ }
+
+ final ApiFuture finalResponse =
+ customResponse != null
+ ? customResponse
+ : mergeResponses(returnedResponse.toArray(new ApiFuture[0]));
+
+ responseStubber =
+ new ResponseStubber() {
+ {
+ put(batchWrite(expectedRequests.toArray(new Write[0])), finalResponse);
+ }
+ };
+ responseStubber.initializeStub(batchWriteCapture, firestoreMock);
+ }
+
+ @Test
+ public void getAllDescendantsRootLevelCollection() throws Exception {
+ doAnswer(emptyQueryResponse())
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ RunQueryRequest expectedRequest =
+ query(
+ null,
+ /* allDescendants= */ true,
+ /* kindless= */ true,
+ select(FieldPath.documentId()),
+ filter(
+ Operator.GREATER_THAN_OR_EQUAL,
+ FieldPath.documentId().toString(),
+ recursiveDeleteStartAt("root")),
+ filter(
+ Operator.LESS_THAN,
+ FieldPath.documentId().toString(),
+ recursiveDeleteEndAt("root")),
+ limit(RecursiveDelete.MAX_PENDING_OPS));
+
+ firestoreMock.recursiveDelete(firestoreMock.collection("root")).get();
+ assertEquals(expectedRequest, runQueryCapture.getValue());
+ }
+
+ @Test
+ public void getAllDescendantsNestedCollection() throws Exception {
+ doAnswer(emptyQueryResponse())
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ RunQueryRequest expectedRequest =
+ query(
+ null,
+ /* parent= */ "root/doc",
+ /* allDescendants= */ true,
+ /* kindless= */ true,
+ select(FieldPath.documentId()),
+ filter(
+ Operator.GREATER_THAN_OR_EQUAL,
+ FieldPath.documentId().toString(),
+ recursiveDeleteStartAt("root/doc/nestedCol")),
+ filter(
+ Operator.LESS_THAN,
+ FieldPath.documentId().toString(),
+ recursiveDeleteEndAt("root/doc/nestedCol")),
+ limit(RecursiveDelete.MAX_PENDING_OPS));
+
+ firestoreMock.recursiveDelete(firestoreMock.collection("root/doc/nestedCol")).get();
+ assertEquals(expectedRequest, runQueryCapture.getValue());
+ }
+
+ @Test
+ public void getAllDescendantsDocument() throws Exception {
+ doAnswer(emptyQueryResponse())
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ // Include dummy response for the deleted fullDocumentPath reference.
+ doAnswer(
+ new Answer>() {
+ public ApiFuture answer(InvocationOnMock mock) {
+ return successResponse(1);
+ }
+ })
+ .when(firestoreMock)
+ .sendRequest(
+ batchWriteCapture.capture(),
+ Matchers.>any());
+
+ RunQueryRequest expectedRequest =
+ query(
+ null,
+ /* parent= */ "root/doc",
+ /* allDescendants= */ true,
+ /* kindless= */ true,
+ select(FieldPath.documentId()),
+ limit(RecursiveDelete.MAX_PENDING_OPS));
+
+ firestoreMock.recursiveDelete(firestoreMock.document("root/doc")).get();
+ assertEquals(expectedRequest, runQueryCapture.getValue());
+ }
+
+ @Test
+ public void createsRetryQueryAfterStreamExceptionWithLastReceivedDoc() throws Exception {
+ doAnswer(
+ queryResponse(
+ new FirestoreException("Mock runQuery() failed in test", Status.UNAVAILABLE),
+ fullDocumentPath(("doc1"))))
+ .doAnswer(emptyQueryResponse())
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ doAnswer(
+ new Answer>() {
+ public ApiFuture answer(InvocationOnMock mock) {
+ return successResponse(1);
+ }
+ })
+ .when(firestoreMock)
+ .sendRequest(
+ batchWriteCapture.capture(),
+ Matchers.>any());
+
+ RunQueryRequest expectedRequest =
+ query(
+ null,
+ /* allDescendants= */ true,
+ /* kindless= */ true,
+ select(FieldPath.documentId()),
+ order(FieldPath.documentId(), Direction.ASCENDING),
+ startAt(reference(fullDocumentPath("doc1")), /* before= */ false),
+ filter(
+ Operator.GREATER_THAN_OR_EQUAL,
+ FieldPath.documentId().toString(),
+ recursiveDeleteStartAt("coll")),
+ filter(
+ Operator.LESS_THAN,
+ FieldPath.documentId().toString(),
+ recursiveDeleteEndAt("coll")),
+ limit(RecursiveDelete.MAX_PENDING_OPS));
+
+ firestoreMock.recursiveDelete(firestoreMock.collection("coll")).get();
+ assertEquals(expectedRequest, runQueryCapture.getAllValues().get(1));
+ }
+
+ @Test
+ public void createsSecondQueryWithCorrectStartAfter() throws Exception {
+ // This test checks that the second query is created with the correct startAfter() once the
+ // RecursiveDelete instance is below the MIN_PENDING_OPS threshold to send the next batch.
+
+ // Use lower limits than the actual RecursiveDelete class in order to make this test run fast.
+ int maxPendingOps = 100;
+ int minPendingOps = 11;
+ int maxBatchSize = 10;
+
+ final int cutoff = maxPendingOps - minPendingOps;
+ final int[] numDeletesBuffered = {0};
+
+ // This future is used to delay the BatchWriteResponses from returning in order to create the
+ // situation where the number of pending operations is less than `minPendingOps`.
+ final SettableApiFuture bufferFuture = SettableApiFuture.create();
+
+ // This future completes when the second query is run.
+ final SettableApiFuture secondQueryFuture = SettableApiFuture.create();
+
+ List firstStream = new ArrayList<>();
+ final List> batchWriteResponse = new ArrayList<>();
+ for (int i = 0; i < maxPendingOps; i++) {
+ firstStream.add(fullDocumentPath("doc" + i));
+ }
+
+ for (int i = 0; i < maxBatchSize; i++) {
+ batchWriteResponse.add(successResponse(1));
+ }
+
+ doAnswer(queryResponse(firstStream.toArray(new String[0])))
+ .doAnswer(
+ new Answer() {
+ public RunQueryResponse answer(InvocationOnMock invocation) throws Throwable {
+ secondQueryFuture.set(null);
+ Object[] args = invocation.getArguments();
+ ApiStreamObserver observer =
+ (ApiStreamObserver) args[1];
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ doAnswer(
+ new Answer>() {
+ public ApiFuture answer(InvocationOnMock mock) throws Exception {
+ if (numDeletesBuffered[0] < cutoff) {
+ numDeletesBuffered[0] += batchWriteResponse.size();
+ // By waiting for `bufferFuture` to complete, we can guarantee that the writes
+ // complete after all documents are streamed. Without this future, the test can
+ // race and complete the writes before the stream is finished, which is a
+ // different scenario this test is not for.
+ return ApiFutures.transformAsync(
+ bufferFuture,
+ new ApiAsyncFunction() {
+ public ApiFuture apply(Void unused) throws Exception {
+ return mergeResponses(batchWriteResponse.toArray(new ApiFuture[0]));
+ }
+ },
+ MoreExecutors.directExecutor());
+ } else {
+ // Once there are `cutoff` pending deletes, completing the future allows enough
+ // responses to be returned such that the number of pending deletes should be less
+ // than `minPendingOps`. This allows us to test that the second query is made.
+ bufferFuture.set(null);
+ return ApiFutures.transformAsync(
+ secondQueryFuture,
+ new ApiAsyncFunction() {
+ public ApiFuture apply(Void unused) throws Exception {
+ return mergeResponses(batchWriteResponse.toArray(new ApiFuture[0]));
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+ }
+ })
+ .when(firestoreMock)
+ .sendRequest(
+ batchWriteCapture.capture(),
+ Matchers.>any());
+
+ BulkWriter bulkWriter = firestoreMock.bulkWriter();
+ bulkWriter.setMaxBatchSize(maxBatchSize);
+
+ RunQueryRequest expectedRequest =
+ query(
+ null,
+ /* allDescendants= */ true,
+ /* kindless= */ true,
+ select(FieldPath.documentId()),
+ order(FieldPath.documentId(), Direction.ASCENDING),
+ startAt(reference(fullDocumentPath("doc" + (maxPendingOps - 1))), /* before= */ false),
+ filter(
+ Operator.GREATER_THAN_OR_EQUAL,
+ FieldPath.documentId().toString(),
+ recursiveDeleteStartAt("coll")),
+ filter(
+ Operator.LESS_THAN,
+ FieldPath.documentId().toString(),
+ recursiveDeleteEndAt("coll")),
+ limit(maxPendingOps));
+
+ firestoreMock
+ .recursiveDelete(
+ firestoreMock.collection("coll").getResourcePath(),
+ bulkWriter,
+ maxPendingOps,
+ minPendingOps)
+ .get();
+ assertEquals(2, runQueryCapture.getAllValues().size());
+ assertEquals(expectedRequest, runQueryCapture.getAllValues().get(1));
+ }
+
+ @Test
+ public void deletesCollection() throws Exception {
+ List documents =
+ Arrays.asList("anna", "bob", "bob/children/charlie", "bob/children/daniel");
+ setupMocks(documents);
+ firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID)).get();
+ responseStubber.verifyAllRequestsSent();
+ }
+
+ @Test
+ public void deletesDocumentAndReference() throws Exception {
+ List documents =
+ Arrays.asList("bob/children/brian", "bob/children/charlie", "bob/children/daniel");
+ setupMocks(documents, "bob");
+ firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID).document("bob")).get();
+ responseStubber.verifyAllRequestsSent();
+ }
+
+ @Test
+ public void exceptionContainsLastErrorCodeIfWritesFail() throws Exception {
+ List documents =
+ Arrays.asList("bob/children/brian", "bob/children/charlie", "bob/children/daniel");
+ ApiFuture customResponse =
+ mergeResponses(
+ successResponse(1),
+ failedResponse(Code.CANCELLED_VALUE),
+ failedResponse(Code.PERMISSION_DENIED_VALUE),
+ successResponse(1));
+ setupMocks(documents, "bob", customResponse);
+ try {
+ ApiFuture future =
+ firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID).document("bob"));
+ future.get();
+ fail("Operation should have failed in test");
+ } catch (Exception e) {
+ assertEquals(Status.PERMISSION_DENIED, ((FirestoreException) e.getCause()).getStatus());
+ assertTrue(e.getMessage().contains("2 deletes failed"));
+ }
+ }
+
+ @Test
+ public void exceptionThrownIfBulkWriterSuccessHandlerFails() throws Exception {
+ List documents = Arrays.asList("bob/children/brian");
+ setupMocks(documents, "bob");
+ bulkWriter = firestoreMock.bulkWriter();
+ bulkWriter.addWriteResultListener(
+ new WriteResultCallback() {
+ public void onResult(DocumentReference documentReference, WriteResult result) {
+ throw new UnsupportedOperationException(
+ "Test code threw UnsupportedOperationException");
+ }
+ });
+ try {
+ ApiFuture future =
+ firestoreMock.recursiveDelete(
+ firestoreMock.collection(COLLECTION_ID).document("bob"), bulkWriter);
+ future.get();
+ fail("Operation should have failed in test");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("2 deletes failed"));
+ }
+ }
+
+ @Test
+ public void successHandlerProvidesCorrectReferencesAndResults() throws Exception {
+ List documents = Arrays.asList("bob/children/brian", "bob/children/charlie");
+ ApiFuture customResponse =
+ mergeResponses(successResponse(1), successResponse(2), successResponse(3));
+ setupMocks(documents, "bob", customResponse);
+
+ final List results = new ArrayList<>();
+ final List references = new ArrayList<>();
+ bulkWriter = firestoreMock.bulkWriter();
+ bulkWriter.addWriteResultListener(
+ new WriteResultCallback() {
+ public void onResult(DocumentReference documentReference, WriteResult result) {
+ results.add((int) result.getUpdateTime().getSeconds());
+ references.add(documentReference.getPath());
+ }
+ });
+ ApiFuture future =
+ firestoreMock.recursiveDelete(
+ firestoreMock.collection(COLLECTION_ID).document("bob"), bulkWriter);
+ future.get();
+ assertArrayEquals(new Integer[] {1, 2, 3}, results.toArray());
+ assertArrayEquals(
+ new String[] {
+ "coll/bob/children/brian", "coll/bob/children/charlie", "coll/bob",
+ },
+ references.toArray());
+ }
+
+ @Test
+ public void errorHandlerProvidesCorrectInformation() throws Exception {
+ List documents = Arrays.asList("bob/children/brian", "bob/children/charlie");
+ ApiFuture customResponse =
+ mergeResponses(
+ failedResponse(Code.PERMISSION_DENIED_VALUE),
+ failedResponse(Code.UNAVAILABLE_VALUE),
+ failedResponse(Code.INTERNAL_VALUE));
+ setupMocks(documents, "bob", customResponse);
+
+ final List codes = new ArrayList<>();
+ final List references = new ArrayList<>();
+ bulkWriter = firestoreMock.bulkWriter();
+ bulkWriter.addWriteErrorListener(
+ new WriteErrorCallback() {
+ public boolean onError(BulkWriterException error) {
+ codes.add(error.getStatus());
+ references.add(error.getDocumentReference().getPath());
+ return false;
+ }
+ });
+ ApiFuture future =
+ firestoreMock.recursiveDelete(
+ firestoreMock.collection(COLLECTION_ID).document("bob"), bulkWriter);
+ try {
+ future.get();
+ fail("Operation should have failed");
+ } catch (Exception e) {
+ assertArrayEquals(
+ new Status[] {Status.PERMISSION_DENIED, Status.UNAVAILABLE, Status.INTERNAL},
+ codes.toArray());
+ assertArrayEquals(
+ new String[] {
+ "coll/bob/children/brian", "coll/bob/children/charlie", "coll/bob",
+ },
+ references.toArray());
+ }
+ }
+
+ @Test
+ public void exceptionThrownIfProvidedReferenceWasNotDeleted() throws Exception {
+ doAnswer(emptyQueryResponse())
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+ doReturn(BulkWriterTest.FAILED_FUTURE)
+ .when(firestoreMock)
+ .sendRequest(
+ batchWriteCapture.capture(),
+ Matchers.>any());
+
+ ApiFuture future = firestoreMock.recursiveDelete(firestoreMock.document("root/doc"));
+ try {
+ future.get();
+ fail("Operation should have failed in test");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Mock batchWrite failed in test"));
+ }
+ }
+
+ @Test
+ public void handlesSuccessfulStreamErrorRetries() throws Exception {
+ FirestoreException mockException =
+ new FirestoreException("runQuery() failed in test", Status.UNAVAILABLE);
+ // Note that these mock responses differ from the Node implementation since Node retries
+ // queries that fail without streaming any documents in the SDK, whereas Java handles these
+ // retries at the GAX level. The Java SDK does not retry unless a document was streamed in the
+ // query.
+ final List> runQueryResponses =
+ new ArrayList<>(
+ Arrays.asList(
+ queryResponse(mockException, fullDocumentPath("a"), fullDocumentPath("b")),
+ queryResponse(mockException, fullDocumentPath("c")),
+ queryResponse(mockException, fullDocumentPath("d")),
+ queryResponse(fullDocumentPath("e"))));
+
+ doAnswer(runQueryResponses.get(0))
+ .doAnswer(runQueryResponses.get(1))
+ .doAnswer(runQueryResponses.get(2))
+ .doAnswer(runQueryResponses.get(3))
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ ResponseStubber responseStubber =
+ new ResponseStubber() {
+ {
+ put(
+ batchWrite(
+ delete("coll/a"),
+ delete("coll/b"),
+ delete("coll/c"),
+ delete("coll/d"),
+ delete("coll/e")),
+ mergeResponses(
+ successResponse(1),
+ successResponse(1),
+ successResponse(1),
+ successResponse(1),
+ successResponse(1)));
+ }
+ };
+ responseStubber.initializeStub(batchWriteCapture, firestoreMock);
+
+ firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID)).get();
+ responseStubber.verifyAllRequestsSent();
+ };
+
+ @Test
+ public void handlesMultipleCallsToRecursiveDelete() throws Exception {
+ final List> runQueryResponses =
+ new ArrayList<>(
+ Arrays.asList(
+ queryResponse(fullDocumentPath("a")),
+ queryResponse(fullDocumentPath("b")),
+ queryResponse(fullDocumentPath("c"))));
+
+ doAnswer(runQueryResponses.get(0))
+ .doAnswer(runQueryResponses.get(1))
+ .doAnswer(runQueryResponses.get(2))
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ ResponseStubber responseStubber =
+ new ResponseStubber() {
+ {
+ put(batchWrite(delete("coll/a")), successResponse(1));
+ put(batchWrite(delete("coll/b")), successResponse(2));
+ put(batchWrite(delete("coll/c")), successResponse(3));
+ }
+ };
+ responseStubber.initializeStub(batchWriteCapture, firestoreMock);
+
+ firestoreMock.recursiveDelete(firestoreMock.collection("a")).get();
+ firestoreMock.recursiveDelete(firestoreMock.collection("b")).get();
+ firestoreMock.recursiveDelete(firestoreMock.collection("c")).get();
+ responseStubber.verifyAllRequestsSent();
+ }
+
+ @Test
+ public void usesSameBulkWriterInstanceAcrossCalls() throws Exception {
+ doAnswer(emptyQueryResponse())
+ .when(firestoreMock)
+ .streamRequest(
+ runQueryCapture.capture(),
+ streamObserverCapture.capture(),
+ Matchers.any());
+
+ final int[] callCount = {0};
+ final BulkWriter bulkWriter = firestoreMock.bulkWriter();
+ doAnswer(
+ new Answer() {
+ public BulkWriter answer(InvocationOnMock mock) throws Throwable {
+ callCount[0]++;
+ return bulkWriter;
+ }
+ })
+ .when(firestoreMock)
+ .bulkWriter();
+
+ firestoreMock.recursiveDelete(firestoreMock.collection("foo")).get();
+ firestoreMock.recursiveDelete(firestoreMock.collection("boo")).get();
+ firestoreMock.recursiveDelete(firestoreMock.collection("moo")).get();
+
+ // Only the first recursiveDelete() call should have called the constructor. Subsequent calls
+ // should have used the same bulkWriter.
+ assertEquals(1, callCount[0]);
+ }
+
+ @Test
+ public void throwsErrorIfBulkWriterInstanceIsClosed() throws Exception {
+ bulkWriter = firestoreMock.bulkWriter();
+ bulkWriter.close();
+ try {
+ firestoreMock.recursiveDelete(firestoreMock.collection("foo"), bulkWriter);
+ fail("Operation should have failed in test");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("BulkWriter has already been closed"));
+ }
+ }
+}
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java
index 11e53804c..da94de92a 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java
@@ -41,6 +41,8 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.Timestamp;
+import com.google.cloud.firestore.BulkWriter;
+import com.google.cloud.firestore.BulkWriter.WriteResultCallback;
import com.google.cloud.firestore.CollectionReference;
import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot;
@@ -1686,4 +1688,110 @@ public void testBuildingBundleWithLimitToLastQuery() throws Exception {
randomColl.document("doc2").get().get(),
limitToLastQuerySnap.getReadTime().toProto());
}
+
+ private int countDocumentChildren(DocumentReference reference) {
+ int count = 0;
+ Iterable collections = reference.listCollections();
+ for (CollectionReference collectionReference : collections) {
+ count += countCollectionChildren(collectionReference);
+ }
+ return count;
+ }
+
+ private int countCollectionChildren(CollectionReference reference) {
+ int count = 0;
+ Iterable documents = reference.listDocuments();
+ for (DocumentReference documentReference : documents) {
+ count += countDocumentChildren(documentReference) + 1;
+ }
+ return count;
+ }
+
+ private void setupRecursiveDeleteTest() throws Exception {
+ // ROOT-DB
+ // └── randomCol
+ // ├── anna
+ // └── bob
+ // └── parentsCol
+ // ├── charlie
+ // └── daniel
+ // └── childCol
+ // ├── ernie
+ // └── francis
+ WriteBatch batch = firestore.batch();
+ batch.set(randomColl.document("anna"), map("name", "anna"));
+ batch.set(randomColl.document("bob"), map("name", "bob"));
+ batch.set(randomColl.document("bob/parentsCol/charlie"), map("name", "charlie"));
+ batch.set(randomColl.document("bob/parentsCol/daniel"), map("name", "daniel"));
+ batch.set(randomColl.document("bob/parentsCol/daniel/childCol/ernie"), map("name", "ernie"));
+ batch.set(
+ randomColl.document("bob/parentsCol/daniel/childCol/francis"), map("name", "francis"));
+ batch.commit().get();
+ }
+
+ @Test
+ public void testRecursiveDeleteTopLevelCollection() throws Exception {
+ setupRecursiveDeleteTest();
+ firestore.recursiveDelete(randomColl).get();
+ assertEquals(0, countCollectionChildren(randomColl));
+ }
+
+ @Test
+ public void testRecursiveDeleteNestedCollection() throws Exception {
+ setupRecursiveDeleteTest();
+ firestore.recursiveDelete(randomColl.document("bob").collection("parentsCol")).get();
+ assertEquals(2, countCollectionChildren(randomColl));
+ }
+
+ @Test
+ public void testRecursiveDeleteNestedDocument() throws Exception {
+ setupRecursiveDeleteTest();
+ DocumentReference document = randomColl.document("bob/parentsCol/daniel");
+ firestore.recursiveDelete(document).get();
+ DocumentSnapshot snap = document.get().get();
+ assertFalse(snap.exists());
+ assertEquals(1, countDocumentChildren(randomColl.document("bob")));
+ assertEquals(3, countCollectionChildren(randomColl));
+ }
+
+ @Test
+ public void testRecursiveDeleteLeafDocument() throws Exception {
+ setupRecursiveDeleteTest();
+ DocumentReference document = randomColl.document("bob/parentsCol/daniel/childCol/ernie");
+ firestore.recursiveDelete(document).get();
+ DocumentSnapshot snap = document.get().get();
+ assertFalse(snap.exists());
+ assertEquals(5, countCollectionChildren(randomColl));
+ }
+
+ @Test
+ public void testRecursiveDeleteDoesNotAffectOtherCollections() throws Exception {
+ setupRecursiveDeleteTest();
+
+ // Add another nested collection that shouldn't be deleted.
+ CollectionReference collectionB = firestore.collection("doggos");
+ collectionB.document("doggo").set(map("name", "goodboi")).get();
+
+ firestore.recursiveDelete(collectionB).get();
+ assertEquals(6, countCollectionChildren(randomColl));
+ assertEquals(0, countCollectionChildren(collectionB));
+ }
+
+ @Test
+ public void testRecursiveDeleteWithCustomBulkWriterInstance() throws Exception {
+ setupRecursiveDeleteTest();
+
+ BulkWriter bulkWriter = firestore.bulkWriter();
+ final int[] callbackCount = {0};
+ bulkWriter.addWriteResultListener(
+ new WriteResultCallback() {
+ public void onResult(DocumentReference documentReference, WriteResult result) {
+ callbackCount[0]++;
+ }
+ });
+
+ firestore.recursiveDelete(randomColl, bulkWriter).get();
+ assertEquals(0, countCollectionChildren(randomColl));
+ assertEquals(6, callbackCount[0]);
+ }
}