From 9ff2f41b765c8878c3b3fb7df962f6f1ed537f05 Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Tue, 25 May 2021 10:30:08 -0500 Subject: [PATCH] feat: add recursiveDelete() to Firestore (#622) (#649) --- .../clirr-ignored-differences.xml | 6 + .../google/cloud/firestore/BulkWriter.java | 12 +- .../com/google/cloud/firestore/Firestore.java | 78 ++ .../google/cloud/firestore/FirestoreImpl.java | 56 ++ .../com/google/cloud/firestore/Query.java | 53 +- .../cloud/firestore/RecursiveDelete.java | 332 ++++++++ .../google/cloud/firestore/ResourcePath.java | 12 + .../cloud/firestore/BulkWriterTest.java | 10 +- .../cloud/firestore/LocalFirestoreHelper.java | 43 +- .../cloud/firestore/RecursiveDeleteTest.java | 736 ++++++++++++++++++ .../cloud/firestore/it/ITSystemTest.java | 108 +++ 11 files changed, 1420 insertions(+), 26 deletions(-) create mode 100644 google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java create mode 100644 google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml index 4a4d5d4c0..d32596ef0 100644 --- a/google-cloud-firestore/clirr-ignored-differences.xml +++ b/google-cloud-firestore/clirr-ignored-differences.xml @@ -252,4 +252,10 @@ * + + + 7012 + com/google/cloud/firestore/Firestore + com.google.api.core.ApiFuture recursiveDelete(*) + diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index bb9d75090..b927dd6a5 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -805,7 +805,17 @@ public void close() throws InterruptedException, ExecutionException { flushFuture.get(); } - private void verifyNotClosedLocked() { + /** + * Used for verifying that the BulkWriter instance isn't closed when calling from outside this + * class. + */ + void verifyNotClosed() { + synchronized (lock) { + verifyNotClosedLocked(); + } + } + + void verifyNotClosedLocked() { if (this.closed) { throw new IllegalStateException("BulkWriter has already been closed."); } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java index a1c3a5f29..2cd897b49 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java @@ -193,6 +193,84 @@ void getAll( @Nonnull BulkWriter bulkWriter(BulkWriterOptions options); + /** + * Recursively deletes all documents and subcollections at and under the specified level. + * + *

If any delete fails, the ApiFuture contains an error with an error message containing the + * number of failed deletes and the stack trace of the last failed delete. The provided reference + * is deleted regardless of whether all deletes succeeded. + * + *

recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the + * deletes. To customize throttling rates or add success/error callbacks, pass in a custom + * BulkWriter instance. + * + * @param reference The reference of the collection to delete. + * @return An ApiFuture that completes when all deletes have been performed. The future fails with + * an error if any of the deletes fail. + */ + @BetaApi + @Nonnull + ApiFuture recursiveDelete(CollectionReference reference); + + /** + * Recursively deletes all documents and subcollections at and under the specified level. + * + *

If any delete fails, the ApiFuture contains an error with an error message containing the + * number of failed deletes and the stack trace of the last failed delete. The provided reference + * is deleted regardless of whether all deletes succeeded. + * + *

recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the + * deletes. To customize throttling rates or add success/error callbacks, pass in a custom + * BulkWriter instance. + * + * @param reference The reference of the collection to delete. + * @param bulkWriter A custom BulkWriter instance used to perform the deletes. + * @return An ApiFuture that completes when all deletes have been performed. The future fails with + * an error if any of the deletes fail. + */ + @BetaApi + @Nonnull + ApiFuture recursiveDelete(CollectionReference reference, BulkWriter bulkWriter); + + /** + * Recursively deletes all documents and subcollections at and under the specified level. + * + *

If any delete fails, the ApiFuture contains an error with an error message containing the + * number of failed deletes and the stack trace of the last failed delete. The provided reference + * is deleted regardless of whether all deletes succeeded. + * + *

recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the + * deletes. To customize throttling rates or add success/error callbacks, pass in a custom + * BulkWriter instance. + * + * @param reference The reference of the document to delete. + * @return An ApiFuture that completes when all deletes have been performed. The future fails with + * an error if any of the deletes fail. + */ + @BetaApi + @Nonnull + ApiFuture recursiveDelete(DocumentReference reference); + + /** + * Recursively deletes all documents and subcollections at and under the specified level. + * + *

If any delete fails, the ApiFuture contains an error with an error message containing the + * number of failed deletes and the stack trace of the last failed delete. The provided reference + * is deleted regardless of whether all deletes succeeded. + * + *

recursiveDelete() uses a {@link BulkWriter} instance with default settings to perform the + * deletes. To customize throttling rates or add success/error callbacks, pass in a custom + * BulkWriter instance. + * + * @param reference The reference of the document to delete. + * @param bulkWriter A custom BulkWriter instance used to perform the deletes. + * @return An ApiFuture that completes when all deletes have been performed. The future fails with + * an error if any of the deletes fail. + */ + @BetaApi + @Nonnull + ApiFuture recursiveDelete(DocumentReference reference, BulkWriter bulkWriter); + /** * Returns a FirestoreBundle.Builder {@link FirestoreBundle.Builder} instance using an * automatically generated bundle ID. When loaded on clients, client SDKs use the bundle ID and diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index f92b7e3d6..7a93e711d 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -24,6 +24,7 @@ import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.Timestamp; import com.google.cloud.firestore.spi.v1.FirestoreRpc; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.firestore.v1.BatchGetDocumentsRequest; @@ -59,6 +60,12 @@ class FirestoreImpl implements Firestore, FirestoreRpcContext { private final FirestoreOptions firestoreOptions; private final ResourcePath databasePath; + /** + * A lazy-loaded BulkWriter instance to be used with recursiveDelete() if no BulkWriter instance + * is provided. + */ + @Nullable private BulkWriter bulkWriterInstance; + private boolean closed; FirestoreImpl(FirestoreOptions options) { @@ -76,6 +83,14 @@ class FirestoreImpl implements Firestore, FirestoreRpcContext { ResourcePath.create(DatabaseRootName.of(options.getProjectId(), options.getDatabaseId())); } + /** Lazy-load the Firestore's default BulkWriter. */ + private BulkWriter getBulkWriter() { + if (bulkWriterInstance == null) { + bulkWriterInstance = bulkWriter(); + } + return bulkWriterInstance; + } + /** Creates a pseudo-random 20-character ID that can be used for Firestore documents. */ static String autoId() { StringBuilder builder = new StringBuilder(); @@ -102,6 +117,47 @@ public BulkWriter bulkWriter(BulkWriterOptions options) { return new BulkWriter(this, options); } + @Nonnull + public ApiFuture recursiveDelete(CollectionReference reference) { + BulkWriter writer = getBulkWriter(); + return recursiveDelete(reference.getResourcePath(), writer); + } + + @Nonnull + public ApiFuture recursiveDelete(CollectionReference reference, BulkWriter bulkWriter) { + return recursiveDelete(reference.getResourcePath(), bulkWriter); + } + + @Nonnull + public ApiFuture recursiveDelete(DocumentReference reference) { + BulkWriter writer = getBulkWriter(); + return recursiveDelete(reference.getResourcePath(), writer); + } + + @Nonnull + public ApiFuture recursiveDelete( + DocumentReference reference, @Nonnull BulkWriter bulkWriter) { + return recursiveDelete(reference.getResourcePath(), bulkWriter); + } + + @Nonnull + public ApiFuture recursiveDelete(ResourcePath path, BulkWriter bulkWriter) { + return recursiveDelete( + path, bulkWriter, RecursiveDelete.MAX_PENDING_OPS, RecursiveDelete.MIN_PENDING_OPS); + } + + /** + * This overload is not private in order to test the query resumption with startAfter() once the + * RecursiveDelete instance has MAX_PENDING_OPS pending. + */ + @Nonnull + @VisibleForTesting + ApiFuture recursiveDelete( + ResourcePath path, @Nonnull BulkWriter bulkWriter, int maxLimit, int minLimit) { + RecursiveDelete deleter = new RecursiveDelete(this, bulkWriter, path, maxLimit, minLimit); + return deleter.run(); + } + @Nonnull @Override public CollectionReference collection(@Nonnull String collectionPath) { diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java index cc6c2ab34..b1f5fa0bf 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java @@ -255,13 +255,24 @@ abstract static class QueryOptions { abstract ImmutableList getFieldProjections(); + // Whether to select all documents under `parentPath`. By default, only + // collections that match `collectionId` are selected. + abstract boolean isKindless(); + + // Whether to require consistent documents when restarting the query. By + // default, restarting the query uses the readTime offset of the original + // query to provide consistent results. + abstract boolean getRequireConsistency(); + static Builder builder() { return new AutoValue_Query_QueryOptions.Builder() .setAllDescendants(false) .setLimitType(LimitType.First) .setFieldOrders(ImmutableList.of()) .setFieldFilters(ImmutableList.of()) - .setFieldProjections(ImmutableList.of()); + .setFieldProjections(ImmutableList.of()) + .setKindless(false) + .setRequireConsistency(true); } abstract Builder toBuilder(); @@ -290,6 +301,10 @@ abstract static class Builder { abstract Builder setFieldProjections(ImmutableList value); + abstract Builder setKindless(boolean value); + + abstract Builder setRequireConsistency(boolean value); + abstract QueryOptions build(); } } @@ -327,21 +342,21 @@ private static boolean isUnaryComparison(@Nullable Object value) { /** Computes the backend ordering semantics for DocumentSnapshot cursors. */ private ImmutableList createImplicitOrderBy() { List implicitOrders = new ArrayList<>(options.getFieldOrders()); - boolean hasDocumentId = false; + // If no explicit ordering is specified, use the first inequality to define an implicit order. if (implicitOrders.isEmpty()) { - // If no explicit ordering is specified, use the first inequality to define an implicit order. for (FieldFilter fieldFilter : options.getFieldFilters()) { if (fieldFilter.isInequalityFilter()) { implicitOrders.add(new FieldOrder(fieldFilter.fieldReference, Direction.ASCENDING)); break; } } - } else { - for (FieldOrder fieldOrder : options.getFieldOrders()) { - if (FieldPath.isDocumentId(fieldOrder.fieldReference.getFieldPath())) { - hasDocumentId = true; - } + } + + boolean hasDocumentId = false; + for (FieldOrder fieldOrder : implicitOrders) { + if (FieldPath.isDocumentId(fieldOrder.fieldReference.getFieldPath())) { + hasDocumentId = true; } } @@ -1237,7 +1252,12 @@ BundledQuery toBundledQuery() { private StructuredQuery.Builder buildWithoutClientTranslation() { StructuredQuery.Builder structuredQuery = StructuredQuery.newBuilder(); CollectionSelector.Builder collectionSelector = CollectionSelector.newBuilder(); - collectionSelector.setCollectionId(options.getCollectionId()); + + // Kindless queries select all descendant documents, so we don't add the collectionId field. + if (!options.isKindless()) { + collectionSelector.setCollectionId(options.getCollectionId()); + } + collectionSelector.setAllDescendants(options.getAllDescendants()); structuredQuery.addFrom(collectionSelector); @@ -1525,10 +1545,17 @@ public void onError(Throwable throwable) { // since we are requiring at least a single document result. QueryDocumentSnapshot cursor = lastReceivedDocument.get(); if (cursor != null) { - Query.this - .startAfter(cursor) - .internalStream( - documentObserver, /* transactionId= */ null, cursor.getReadTime()); + if (options.getRequireConsistency()) { + Query.this + .startAfter(cursor) + .internalStream( + documentObserver, /* transactionId= */ null, cursor.getReadTime()); + } else { + Query.this + .startAfter(cursor) + .internalStream( + documentObserver, /* transactionId= */ null, /* readTime= */ null); + } } } else { Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error"); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java new file mode 100644 index 000000000..b14eb1983 --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java @@ -0,0 +1,332 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.core.ApiAsyncFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.cloud.firestore.Query.QueryOptions; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Status; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * Class used to store state required for running a recursive delete operation. Each recursive + * delete call should use a new instance of the class. + */ +@BetaApi +public final class RecursiveDelete { + /** + * Datastore allowed numeric IDs where Firestore only allows strings. Numeric IDs are exposed to + * Firestore as __idNUM__, so this is the lowest possible negative numeric value expressed in that + * format. + * + *

This constant is used to specify startAt/endAt values when querying for all descendants in a + * single collection. + */ + public static final String REFERENCE_NAME_MIN_ID = "__id-9223372036854775808__"; + + /** + * The query limit used for recursive deletes when fetching all descendants of the specified + * reference to delete. This is done to prevent the query stream from streaming documents faster + * than Firestore can delete. + */ + public static final int MAX_PENDING_OPS = 5000; + + /** + * The number of pending BulkWriter operations at which RecursiveDelete starts the next limit + * query to fetch descendants. By starting the query while there are pending operations, Firestore + * can improve BulkWriter throughput. This helps prevent BulkWriter from idling while Firestore + * fetches the next query. + */ + public static final int MIN_PENDING_OPS = 1000; + + private final FirestoreRpcContext firestoreRpcContext; + private final BulkWriter writer; + + /** The resource path of the reference to recursively delete */ + private final ResourcePath path; + + /** Lock object for all BulkWriter operations and callbacks. */ + private final Object lock = new Object(); + + /** The number of deletes that failed with a permanent error. */ + @GuardedBy("lock") + private int errorCount = 0; + + /** + * The most recently thrown error. Used to populate the developer-facing error message when the + * recursive delete operation completes. + */ + @GuardedBy("lock") + @Nullable + private Throwable lastError; + + /** Whether there are still documents to delete that still need to be fetched. */ + @GuardedBy("lock") + private boolean documentsPending = true; + + /** A deferred promise that resolves when the recursive delete operation is completed. */ + private final SettableApiFuture completionFuture = SettableApiFuture.create(); + + /** Whether a query stream is currently in progress. Only one stream can be run at a time. */ + @GuardedBy("lock") + private boolean streamInProgress = false; + + /** Whether run() has been called. */ + private boolean started = false; + + /** Query limit to use when fetching all descendants. */ + private final int maxPendingOps; + + /** + * The number of pending BulkWriter operations at which RecursiveDelete starts the next limit + * query to fetch descendants. + */ + private final int minPendingOps; + + /** + * The last document snapshot returned by the stream. Used to set the startAfter() field in the + * subsequent stream. + */ + @Nullable private DocumentSnapshot lastDocumentSnap; + + /** + * The number of pending BulkWriter operations. Used to determine when the next query can be run. + */ + private int pendingOperationsCount = 0; + + RecursiveDelete( + FirestoreRpcContext firestoreRpcContext, + BulkWriter writer, + ResourcePath path, + int maxLimit, + int minLimit) { + this.firestoreRpcContext = firestoreRpcContext; + this.writer = writer; + this.path = path; + this.maxPendingOps = maxLimit; + this.minPendingOps = minLimit; + } + + public ApiFuture run() { + Preconditions.checkState(!started, "RecursiveDelete.run() should only be called once"); + started = true; + + writer.verifyNotClosed(); + + streamDescendants(); + return completionFuture; + } + + private void streamDescendants() { + Query query = getAllDescendantsQuery(); + synchronized (lock) { + streamInProgress = true; + } + final int[] streamedDocsCount = {0}; + final ApiStreamObserver responseObserver = + new ApiStreamObserver() { + public void onNext(DocumentSnapshot snapshot) { + streamedDocsCount[0]++; + lastDocumentSnap = snapshot; + deleteReference(snapshot.getReference()); + } + + public void onError(Throwable throwable) { + String message = "Failed to fetch children documents"; + synchronized (lock) { + lastError = + FirestoreException.forServerRejection(Status.UNAVAILABLE, throwable, message); + } + onQueryEnd(); + } + + public void onCompleted() { + synchronized (lock) { + streamInProgress = false; + } + // If there are fewer than the number of documents specified in the limit() field, we + // know that the query is complete. + if (streamedDocsCount[0] < maxPendingOps) { + onQueryEnd(); + } else if (pendingOperationsCount == 0) { + // Start a new stream if all documents from this stream were deleted before the + // `onCompleted()` handler was called. + streamDescendants(); + } + } + }; + + query.stream(responseObserver); + } + + private Query getAllDescendantsQuery() { + ResourcePath parentPath; + String collectionId; + if (path.isDocument()) { + // The parent is the closest ancestor document to the location we're deleting. Since we are + // deleting a document, the parent is the path of that document. + parentPath = path; + Preconditions.checkState( + path.getParent() != null, "Parent of a document should not be null."); + collectionId = path.getParent().getId(); + } else { + // The parent is the closest ancestor document to the location we're deleting. Since we are + // deleting a collection, the parent is the path of the document containing that collection + // (or the database root, if it is a root collection). + parentPath = path.popLast(); + collectionId = path.getId(); + } + + Query query = + new Query( + firestoreRpcContext, + QueryOptions.builder() + .setParentPath(parentPath) + .setCollectionId(collectionId) + .setAllDescendants(true) + .setKindless(true) + .setRequireConsistency(false) + .build()); + + // Query for names only to fetch empty snapshots. + query = query.select(FieldPath.documentId()).limit(maxPendingOps); + + // To find all descendants of a collection reference, we need to use a + // composite filter that captures all documents that start with the + // collection prefix. The MIN_ID constant represents the minimum key in + // this collection, and a null byte + the MIN_ID represents the minimum + // key is the next possible collection. + if (path.isCollection()) { + char nullChar = '\0'; + String startAt = collectionId + "/" + REFERENCE_NAME_MIN_ID; + String endAt = collectionId + nullChar + "/" + REFERENCE_NAME_MIN_ID; + query = + query + .whereGreaterThanOrEqualTo(FieldPath.documentId(), startAt) + .whereLessThan(FieldPath.documentId(), endAt); + } + + // startAfter() needs to be added after the where() filters since it creates an implicit + // orderBy. + if (lastDocumentSnap != null) { + query = query.startAfter(lastDocumentSnap); + } + + return query; + } + + /** + * Called when all descendants of the provided reference have been streamed or if a permanent + * error occurs during the stream. Deletes the developer provided reference and wraps any errors + * that occurred. + */ + private void onQueryEnd() { + synchronized (lock) { + documentsPending = false; + } + + // Used to aggregate flushFuture and deleteFuture to use with ApiFutures.allAsList(), in order + // to ensure that the delete catchingAsync() callback is run before the flushFuture callback. + List> pendingFutures = new ArrayList<>(); + + // Delete the provided document reference if one was provided. + if (path.isDocument()) { + pendingFutures.add(deleteReference(new DocumentReference(firestoreRpcContext, path))); + } + + pendingFutures.add(writer.flush()); + + // Completes the future returned by run() and sets the exception if an error occurred. + ApiFutures.transformAsync( + ApiFutures.allAsList(pendingFutures), + new ApiAsyncFunction, Void>() { + public ApiFuture apply(List unused) { + synchronized (lock) { + if (lastError == null) { + completionFuture.set(null); + } else { + String message = + errorCount + + (errorCount != 1 ? " deletes" : " delete") + + " failed. " + + lastError.getMessage(); + if (lastError instanceof FirestoreException) { + lastError = + new FirestoreException(message, ((FirestoreException) lastError).getStatus()); + } else { + lastError = new Throwable(message, lastError); + } + completionFuture.setException(lastError); + } + return ApiFutures.immediateFuture(null); + } + } + }, + MoreExecutors.directExecutor()); + } + + /** Deletes the provided reference and starts the next stream if conditions are met. */ + private ApiFuture deleteReference(final DocumentReference reference) { + synchronized (lock) { + pendingOperationsCount++; + } + + ApiFuture catchingDeleteFuture = + ApiFutures.catchingAsync( + writer.delete(reference), + Throwable.class, + new ApiAsyncFunction() { + public ApiFuture apply(Throwable e) { + synchronized (lock) { + errorCount++; + lastError = e; + return ApiFutures.immediateFuture(null); + } + } + }, + MoreExecutors.directExecutor()); + + return ApiFutures.transformAsync( + catchingDeleteFuture, + new ApiAsyncFunction() { + + public ApiFuture apply(WriteResult result) { + synchronized (lock) { + pendingOperationsCount--; + // We wait until the previous stream has ended in order to ensure the + // startAfter document is correct. Starting the next stream while + // there are pending operations allows Firestore to maximize + // BulkWriter throughput. + if (documentsPending && !streamInProgress && pendingOperationsCount < minPendingOps) { + streamDescendants(); + } + return ApiFutures.immediateFuture(null); + } + } + }, + MoreExecutors.directExecutor()); + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java index 7f305475f..ee44cf4f3 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ResourcePath.java @@ -157,6 +157,18 @@ public int compareTo(@Nonnull ResourcePath other) { return super.compareTo(other); } + /** + * Pops the last segment from this `ResourcePath` and returns a newly constructed `ResourcePath` + * without the last segment. This does not change the ResourcePath, since `ResourcePath` is + * immutable. + * + * @return The newly created Path. + */ + ResourcePath popLast() { + ImmutableList segments = getSegments(); + return createPathWithSegments(segments.subList(0, segments.size() - 1)); + } + @Override public String toString() { return getName(); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index daf90171c..5df76ff61 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -77,7 +77,7 @@ @RunWith(MockitoJUnitRunner.class) public class BulkWriterTest { - private static final ApiFuture FAILED_FUTURE = + public static final ApiFuture FAILED_FUTURE = ApiFutures.immediateFailedFuture( new ApiException( new IllegalStateException("Mock batchWrite failed in test"), @@ -125,14 +125,14 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) private DocumentReference doc1; private DocumentReference doc2; - private ApiFuture successResponse(int updateTimeSeconds) { + public static ApiFuture successResponse(int updateTimeSeconds) { BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder(); response.addWriteResultsBuilder().getUpdateTimeBuilder().setSeconds(updateTimeSeconds).build(); response.addStatusBuilder().build(); return ApiFutures.immediateFuture(response.build()); } - private ApiFuture failedResponse(int code) { + public static ApiFuture failedResponse(int code) { BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder(); response.addWriteResultsBuilder().build(); response.addStatusBuilder().setCode(code).build(); @@ -143,8 +143,8 @@ private ApiFuture failedResponse() { return failedResponse(Code.DEADLINE_EXCEEDED_VALUE); } - private ApiFuture mergeResponses(ApiFuture... responses) - throws Exception { + public static ApiFuture mergeResponses( + ApiFuture... responses) throws Exception { BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder(); for (ApiFuture future : responses) { BatchWriteResponse res = future.get(); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java index 2ac9c033b..d7b03b7ee 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java @@ -282,6 +282,11 @@ public static ApiFuture rollbackResponse() { return ApiFutures.immediateFuture(Empty.getDefaultInstance()); } + public static Answer emptyQueryResponse() { + RunQueryResponse[] response = new RunQueryResponse[0]; + return streamingResponse(response, null); + } + public static Answer queryResponse() { return queryResponse(DOCUMENT_NAME); } @@ -573,16 +578,36 @@ public static RunQueryRequest query(StructuredQuery... query) { public static RunQueryRequest query( @Nullable String transactionId, boolean allDescendants, StructuredQuery... query) { + return query(transactionId, /* parent= */ "", allDescendants, /* kindless= */ false, query); + } + + public static RunQueryRequest query( + @Nullable String transactionId, + boolean allDescendants, + boolean kindless, + StructuredQuery... query) { + return query(transactionId, /* parent= */ "", allDescendants, kindless, query); + } + + public static RunQueryRequest query( + @Nullable String transactionId, + String parent, + boolean allDescendants, + boolean kindless, + StructuredQuery... query) { RunQueryRequest.Builder request = RunQueryRequest.newBuilder(); - request.setParent(LocalFirestoreHelper.DATABASE_NAME + "/documents"); + if (!parent.equals("")) { + parent = '/' + parent; + } + request.setParent(LocalFirestoreHelper.DATABASE_NAME + "/documents" + parent); StructuredQuery.Builder structuredQuery = request.getStructuredQueryBuilder(); - CollectionSelector collectionSelector = - CollectionSelector.newBuilder() - .setCollectionId("coll") - .setAllDescendants(allDescendants) - .build(); - structuredQuery.addFrom(collectionSelector); + CollectionSelector.Builder builder = + CollectionSelector.newBuilder().setAllDescendants(allDescendants); + if (!kindless) { + builder.setCollectionId("coll"); + } + structuredQuery.addFrom(builder); for (StructuredQuery option : query) { structuredQuery.mergeFrom(option); @@ -630,6 +655,10 @@ public static BatchGetDocumentsRequest getAll( return request.build(); } + public static StructuredQuery order(FieldPath fieldPath, StructuredQuery.Direction direction) { + return order(fieldPath.getEncodedPath(), direction); + } + public static StructuredQuery order(String fieldPath, StructuredQuery.Direction direction) { StructuredQuery.Builder structuredQuery = StructuredQuery.newBuilder(); structuredQuery.addOrderByBuilder().setField(field(fieldPath)).setDirection(direction); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java new file mode 100644 index 000000000..a83842779 --- /dev/null +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java @@ -0,0 +1,736 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import static com.google.cloud.firestore.BulkWriterTest.failedResponse; +import static com.google.cloud.firestore.BulkWriterTest.mergeResponses; +import static com.google.cloud.firestore.BulkWriterTest.successResponse; +import static com.google.cloud.firestore.LocalFirestoreHelper.COLLECTION_ID; +import static com.google.cloud.firestore.LocalFirestoreHelper.DATABASE_NAME; +import static com.google.cloud.firestore.LocalFirestoreHelper.DOCUMENT_ROOT; +import static com.google.cloud.firestore.LocalFirestoreHelper.batchWrite; +import static com.google.cloud.firestore.LocalFirestoreHelper.delete; +import static com.google.cloud.firestore.LocalFirestoreHelper.emptyQueryResponse; +import static com.google.cloud.firestore.LocalFirestoreHelper.filter; +import static com.google.cloud.firestore.LocalFirestoreHelper.limit; +import static com.google.cloud.firestore.LocalFirestoreHelper.order; +import static com.google.cloud.firestore.LocalFirestoreHelper.query; +import static com.google.cloud.firestore.LocalFirestoreHelper.queryResponse; +import static com.google.cloud.firestore.LocalFirestoreHelper.reference; +import static com.google.cloud.firestore.LocalFirestoreHelper.select; +import static com.google.cloud.firestore.LocalFirestoreHelper.startAt; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; + +import com.google.api.core.ApiAsyncFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.firestore.BulkWriter.WriteErrorCallback; +import com.google.cloud.firestore.BulkWriter.WriteResultCallback; +import com.google.cloud.firestore.LocalFirestoreHelper.ResponseStubber; +import com.google.cloud.firestore.spi.v1.FirestoreRpc; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.firestore.v1.BatchWriteRequest; +import com.google.firestore.v1.BatchWriteResponse; +import com.google.firestore.v1.RunQueryRequest; +import com.google.firestore.v1.RunQueryResponse; +import com.google.firestore.v1.StructuredQuery.Direction; +import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator; +import com.google.firestore.v1.Value; +import com.google.firestore.v1.Write; +import com.google.rpc.Code; +import io.grpc.Status; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +public class RecursiveDeleteTest { + @Rule public Timeout timeout = new Timeout(5, TimeUnit.SECONDS); + + @Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class); + + /** Executor that executes delayed tasks without delay. */ + private final ScheduledExecutorService immediateExecutor = + new ScheduledThreadPoolExecutor(1) { + @Override + @Nonnull + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(command, 0, TimeUnit.MILLISECONDS); + } + }; + + @Spy + private final FirestoreImpl firestoreMock = + new FirestoreImpl( + FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc); + + @Captor private ArgumentCaptor batchWriteCapture; + @Captor private ArgumentCaptor runQueryCapture; + @Captor private ArgumentCaptor streamObserverCapture; + + private BulkWriter bulkWriter; + private ResponseStubber responseStubber; + + @Before + public void before() { + doReturn(immediateExecutor).when(firestoreRpc).getExecutor(); + + final ScheduledExecutorService timeoutExecutor = + new ScheduledThreadPoolExecutor(1) { + @Override + @Nonnull + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(command, 0, TimeUnit.MILLISECONDS); + } + }; + } + + private Value recursiveDeleteStartAt(String name) { + return Value.newBuilder() + .setReferenceValue( + DATABASE_NAME + "/documents/" + name + "/" + RecursiveDelete.REFERENCE_NAME_MIN_ID) + .build(); + } + + private Value recursiveDeleteEndAt(String name) { + return Value.newBuilder() + .setReferenceValue( + DOCUMENT_ROOT + name + '\0' + "/" + RecursiveDelete.REFERENCE_NAME_MIN_ID) + .build(); + } + + private String fullDocumentPath(String name) { + return DOCUMENT_ROOT + COLLECTION_ID + "/" + name; + } + + private void setupMocks(List childrenDocuments) throws Exception { + setupMocks(childrenDocuments, /* deleteDocRef= */ ""); + } + + private void setupMocks(List childrenDocuments, String deleteDocRef) throws Exception { + setupMocks(childrenDocuments, deleteDocRef, null); + } + + private void setupMocks( + List childrenDocuments, + final String deleteDocRef, + @Nullable final ApiFuture customResponse) + throws Exception { + + List fullDocumentPaths = new ArrayList<>(); + for (String documentName : childrenDocuments) { + fullDocumentPaths.add(fullDocumentPath(documentName)); + } + + doAnswer(queryResponse(fullDocumentPaths.toArray(new String[0]))) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + if (!deleteDocRef.equals("")) { + childrenDocuments = new ArrayList<>(childrenDocuments); + childrenDocuments.add(deleteDocRef); + } + + final List expectedRequests = new ArrayList<>(); + final List> returnedResponse = new ArrayList<>(); + for (String documentPath : childrenDocuments) { + expectedRequests.add(delete(COLLECTION_ID + "/" + documentPath)); + returnedResponse.add(successResponse(1)); + } + + final ApiFuture finalResponse = + customResponse != null + ? customResponse + : mergeResponses(returnedResponse.toArray(new ApiFuture[0])); + + responseStubber = + new ResponseStubber() { + { + put(batchWrite(expectedRequests.toArray(new Write[0])), finalResponse); + } + }; + responseStubber.initializeStub(batchWriteCapture, firestoreMock); + } + + @Test + public void getAllDescendantsRootLevelCollection() throws Exception { + doAnswer(emptyQueryResponse()) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + RunQueryRequest expectedRequest = + query( + null, + /* allDescendants= */ true, + /* kindless= */ true, + select(FieldPath.documentId()), + filter( + Operator.GREATER_THAN_OR_EQUAL, + FieldPath.documentId().toString(), + recursiveDeleteStartAt("root")), + filter( + Operator.LESS_THAN, + FieldPath.documentId().toString(), + recursiveDeleteEndAt("root")), + limit(RecursiveDelete.MAX_PENDING_OPS)); + + firestoreMock.recursiveDelete(firestoreMock.collection("root")).get(); + assertEquals(expectedRequest, runQueryCapture.getValue()); + } + + @Test + public void getAllDescendantsNestedCollection() throws Exception { + doAnswer(emptyQueryResponse()) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + RunQueryRequest expectedRequest = + query( + null, + /* parent= */ "root/doc", + /* allDescendants= */ true, + /* kindless= */ true, + select(FieldPath.documentId()), + filter( + Operator.GREATER_THAN_OR_EQUAL, + FieldPath.documentId().toString(), + recursiveDeleteStartAt("root/doc/nestedCol")), + filter( + Operator.LESS_THAN, + FieldPath.documentId().toString(), + recursiveDeleteEndAt("root/doc/nestedCol")), + limit(RecursiveDelete.MAX_PENDING_OPS)); + + firestoreMock.recursiveDelete(firestoreMock.collection("root/doc/nestedCol")).get(); + assertEquals(expectedRequest, runQueryCapture.getValue()); + } + + @Test + public void getAllDescendantsDocument() throws Exception { + doAnswer(emptyQueryResponse()) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + // Include dummy response for the deleted fullDocumentPath reference. + doAnswer( + new Answer>() { + public ApiFuture answer(InvocationOnMock mock) { + return successResponse(1); + } + }) + .when(firestoreMock) + .sendRequest( + batchWriteCapture.capture(), + Matchers.>any()); + + RunQueryRequest expectedRequest = + query( + null, + /* parent= */ "root/doc", + /* allDescendants= */ true, + /* kindless= */ true, + select(FieldPath.documentId()), + limit(RecursiveDelete.MAX_PENDING_OPS)); + + firestoreMock.recursiveDelete(firestoreMock.document("root/doc")).get(); + assertEquals(expectedRequest, runQueryCapture.getValue()); + } + + @Test + public void createsRetryQueryAfterStreamExceptionWithLastReceivedDoc() throws Exception { + doAnswer( + queryResponse( + new FirestoreException("Mock runQuery() failed in test", Status.UNAVAILABLE), + fullDocumentPath(("doc1")))) + .doAnswer(emptyQueryResponse()) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + doAnswer( + new Answer>() { + public ApiFuture answer(InvocationOnMock mock) { + return successResponse(1); + } + }) + .when(firestoreMock) + .sendRequest( + batchWriteCapture.capture(), + Matchers.>any()); + + RunQueryRequest expectedRequest = + query( + null, + /* allDescendants= */ true, + /* kindless= */ true, + select(FieldPath.documentId()), + order(FieldPath.documentId(), Direction.ASCENDING), + startAt(reference(fullDocumentPath("doc1")), /* before= */ false), + filter( + Operator.GREATER_THAN_OR_EQUAL, + FieldPath.documentId().toString(), + recursiveDeleteStartAt("coll")), + filter( + Operator.LESS_THAN, + FieldPath.documentId().toString(), + recursiveDeleteEndAt("coll")), + limit(RecursiveDelete.MAX_PENDING_OPS)); + + firestoreMock.recursiveDelete(firestoreMock.collection("coll")).get(); + assertEquals(expectedRequest, runQueryCapture.getAllValues().get(1)); + } + + @Test + public void createsSecondQueryWithCorrectStartAfter() throws Exception { + // This test checks that the second query is created with the correct startAfter() once the + // RecursiveDelete instance is below the MIN_PENDING_OPS threshold to send the next batch. + + // Use lower limits than the actual RecursiveDelete class in order to make this test run fast. + int maxPendingOps = 100; + int minPendingOps = 11; + int maxBatchSize = 10; + + final int cutoff = maxPendingOps - minPendingOps; + final int[] numDeletesBuffered = {0}; + + // This future is used to delay the BatchWriteResponses from returning in order to create the + // situation where the number of pending operations is less than `minPendingOps`. + final SettableApiFuture bufferFuture = SettableApiFuture.create(); + + // This future completes when the second query is run. + final SettableApiFuture secondQueryFuture = SettableApiFuture.create(); + + List firstStream = new ArrayList<>(); + final List> batchWriteResponse = new ArrayList<>(); + for (int i = 0; i < maxPendingOps; i++) { + firstStream.add(fullDocumentPath("doc" + i)); + } + + for (int i = 0; i < maxBatchSize; i++) { + batchWriteResponse.add(successResponse(1)); + } + + doAnswer(queryResponse(firstStream.toArray(new String[0]))) + .doAnswer( + new Answer() { + public RunQueryResponse answer(InvocationOnMock invocation) throws Throwable { + secondQueryFuture.set(null); + Object[] args = invocation.getArguments(); + ApiStreamObserver observer = + (ApiStreamObserver) args[1]; + observer.onCompleted(); + return null; + } + }) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + doAnswer( + new Answer>() { + public ApiFuture answer(InvocationOnMock mock) throws Exception { + if (numDeletesBuffered[0] < cutoff) { + numDeletesBuffered[0] += batchWriteResponse.size(); + // By waiting for `bufferFuture` to complete, we can guarantee that the writes + // complete after all documents are streamed. Without this future, the test can + // race and complete the writes before the stream is finished, which is a + // different scenario this test is not for. + return ApiFutures.transformAsync( + bufferFuture, + new ApiAsyncFunction() { + public ApiFuture apply(Void unused) throws Exception { + return mergeResponses(batchWriteResponse.toArray(new ApiFuture[0])); + } + }, + MoreExecutors.directExecutor()); + } else { + // Once there are `cutoff` pending deletes, completing the future allows enough + // responses to be returned such that the number of pending deletes should be less + // than `minPendingOps`. This allows us to test that the second query is made. + bufferFuture.set(null); + return ApiFutures.transformAsync( + secondQueryFuture, + new ApiAsyncFunction() { + public ApiFuture apply(Void unused) throws Exception { + return mergeResponses(batchWriteResponse.toArray(new ApiFuture[0])); + } + }, + MoreExecutors.directExecutor()); + } + } + }) + .when(firestoreMock) + .sendRequest( + batchWriteCapture.capture(), + Matchers.>any()); + + BulkWriter bulkWriter = firestoreMock.bulkWriter(); + bulkWriter.setMaxBatchSize(maxBatchSize); + + RunQueryRequest expectedRequest = + query( + null, + /* allDescendants= */ true, + /* kindless= */ true, + select(FieldPath.documentId()), + order(FieldPath.documentId(), Direction.ASCENDING), + startAt(reference(fullDocumentPath("doc" + (maxPendingOps - 1))), /* before= */ false), + filter( + Operator.GREATER_THAN_OR_EQUAL, + FieldPath.documentId().toString(), + recursiveDeleteStartAt("coll")), + filter( + Operator.LESS_THAN, + FieldPath.documentId().toString(), + recursiveDeleteEndAt("coll")), + limit(maxPendingOps)); + + firestoreMock + .recursiveDelete( + firestoreMock.collection("coll").getResourcePath(), + bulkWriter, + maxPendingOps, + minPendingOps) + .get(); + assertEquals(2, runQueryCapture.getAllValues().size()); + assertEquals(expectedRequest, runQueryCapture.getAllValues().get(1)); + } + + @Test + public void deletesCollection() throws Exception { + List documents = + Arrays.asList("anna", "bob", "bob/children/charlie", "bob/children/daniel"); + setupMocks(documents); + firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID)).get(); + responseStubber.verifyAllRequestsSent(); + } + + @Test + public void deletesDocumentAndReference() throws Exception { + List documents = + Arrays.asList("bob/children/brian", "bob/children/charlie", "bob/children/daniel"); + setupMocks(documents, "bob"); + firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID).document("bob")).get(); + responseStubber.verifyAllRequestsSent(); + } + + @Test + public void exceptionContainsLastErrorCodeIfWritesFail() throws Exception { + List documents = + Arrays.asList("bob/children/brian", "bob/children/charlie", "bob/children/daniel"); + ApiFuture customResponse = + mergeResponses( + successResponse(1), + failedResponse(Code.CANCELLED_VALUE), + failedResponse(Code.PERMISSION_DENIED_VALUE), + successResponse(1)); + setupMocks(documents, "bob", customResponse); + try { + ApiFuture future = + firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID).document("bob")); + future.get(); + fail("Operation should have failed in test"); + } catch (Exception e) { + assertEquals(Status.PERMISSION_DENIED, ((FirestoreException) e.getCause()).getStatus()); + assertTrue(e.getMessage().contains("2 deletes failed")); + } + } + + @Test + public void exceptionThrownIfBulkWriterSuccessHandlerFails() throws Exception { + List documents = Arrays.asList("bob/children/brian"); + setupMocks(documents, "bob"); + bulkWriter = firestoreMock.bulkWriter(); + bulkWriter.addWriteResultListener( + new WriteResultCallback() { + public void onResult(DocumentReference documentReference, WriteResult result) { + throw new UnsupportedOperationException( + "Test code threw UnsupportedOperationException"); + } + }); + try { + ApiFuture future = + firestoreMock.recursiveDelete( + firestoreMock.collection(COLLECTION_ID).document("bob"), bulkWriter); + future.get(); + fail("Operation should have failed in test"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("2 deletes failed")); + } + } + + @Test + public void successHandlerProvidesCorrectReferencesAndResults() throws Exception { + List documents = Arrays.asList("bob/children/brian", "bob/children/charlie"); + ApiFuture customResponse = + mergeResponses(successResponse(1), successResponse(2), successResponse(3)); + setupMocks(documents, "bob", customResponse); + + final List results = new ArrayList<>(); + final List references = new ArrayList<>(); + bulkWriter = firestoreMock.bulkWriter(); + bulkWriter.addWriteResultListener( + new WriteResultCallback() { + public void onResult(DocumentReference documentReference, WriteResult result) { + results.add((int) result.getUpdateTime().getSeconds()); + references.add(documentReference.getPath()); + } + }); + ApiFuture future = + firestoreMock.recursiveDelete( + firestoreMock.collection(COLLECTION_ID).document("bob"), bulkWriter); + future.get(); + assertArrayEquals(new Integer[] {1, 2, 3}, results.toArray()); + assertArrayEquals( + new String[] { + "coll/bob/children/brian", "coll/bob/children/charlie", "coll/bob", + }, + references.toArray()); + } + + @Test + public void errorHandlerProvidesCorrectInformation() throws Exception { + List documents = Arrays.asList("bob/children/brian", "bob/children/charlie"); + ApiFuture customResponse = + mergeResponses( + failedResponse(Code.PERMISSION_DENIED_VALUE), + failedResponse(Code.UNAVAILABLE_VALUE), + failedResponse(Code.INTERNAL_VALUE)); + setupMocks(documents, "bob", customResponse); + + final List codes = new ArrayList<>(); + final List references = new ArrayList<>(); + bulkWriter = firestoreMock.bulkWriter(); + bulkWriter.addWriteErrorListener( + new WriteErrorCallback() { + public boolean onError(BulkWriterException error) { + codes.add(error.getStatus()); + references.add(error.getDocumentReference().getPath()); + return false; + } + }); + ApiFuture future = + firestoreMock.recursiveDelete( + firestoreMock.collection(COLLECTION_ID).document("bob"), bulkWriter); + try { + future.get(); + fail("Operation should have failed"); + } catch (Exception e) { + assertArrayEquals( + new Status[] {Status.PERMISSION_DENIED, Status.UNAVAILABLE, Status.INTERNAL}, + codes.toArray()); + assertArrayEquals( + new String[] { + "coll/bob/children/brian", "coll/bob/children/charlie", "coll/bob", + }, + references.toArray()); + } + } + + @Test + public void exceptionThrownIfProvidedReferenceWasNotDeleted() throws Exception { + doAnswer(emptyQueryResponse()) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + doReturn(BulkWriterTest.FAILED_FUTURE) + .when(firestoreMock) + .sendRequest( + batchWriteCapture.capture(), + Matchers.>any()); + + ApiFuture future = firestoreMock.recursiveDelete(firestoreMock.document("root/doc")); + try { + future.get(); + fail("Operation should have failed in test"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); + } + } + + @Test + public void handlesSuccessfulStreamErrorRetries() throws Exception { + FirestoreException mockException = + new FirestoreException("runQuery() failed in test", Status.UNAVAILABLE); + // Note that these mock responses differ from the Node implementation since Node retries + // queries that fail without streaming any documents in the SDK, whereas Java handles these + // retries at the GAX level. The Java SDK does not retry unless a document was streamed in the + // query. + final List> runQueryResponses = + new ArrayList<>( + Arrays.asList( + queryResponse(mockException, fullDocumentPath("a"), fullDocumentPath("b")), + queryResponse(mockException, fullDocumentPath("c")), + queryResponse(mockException, fullDocumentPath("d")), + queryResponse(fullDocumentPath("e")))); + + doAnswer(runQueryResponses.get(0)) + .doAnswer(runQueryResponses.get(1)) + .doAnswer(runQueryResponses.get(2)) + .doAnswer(runQueryResponses.get(3)) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + ResponseStubber responseStubber = + new ResponseStubber() { + { + put( + batchWrite( + delete("coll/a"), + delete("coll/b"), + delete("coll/c"), + delete("coll/d"), + delete("coll/e")), + mergeResponses( + successResponse(1), + successResponse(1), + successResponse(1), + successResponse(1), + successResponse(1))); + } + }; + responseStubber.initializeStub(batchWriteCapture, firestoreMock); + + firestoreMock.recursiveDelete(firestoreMock.collection(COLLECTION_ID)).get(); + responseStubber.verifyAllRequestsSent(); + }; + + @Test + public void handlesMultipleCallsToRecursiveDelete() throws Exception { + final List> runQueryResponses = + new ArrayList<>( + Arrays.asList( + queryResponse(fullDocumentPath("a")), + queryResponse(fullDocumentPath("b")), + queryResponse(fullDocumentPath("c")))); + + doAnswer(runQueryResponses.get(0)) + .doAnswer(runQueryResponses.get(1)) + .doAnswer(runQueryResponses.get(2)) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + ResponseStubber responseStubber = + new ResponseStubber() { + { + put(batchWrite(delete("coll/a")), successResponse(1)); + put(batchWrite(delete("coll/b")), successResponse(2)); + put(batchWrite(delete("coll/c")), successResponse(3)); + } + }; + responseStubber.initializeStub(batchWriteCapture, firestoreMock); + + firestoreMock.recursiveDelete(firestoreMock.collection("a")).get(); + firestoreMock.recursiveDelete(firestoreMock.collection("b")).get(); + firestoreMock.recursiveDelete(firestoreMock.collection("c")).get(); + responseStubber.verifyAllRequestsSent(); + } + + @Test + public void usesSameBulkWriterInstanceAcrossCalls() throws Exception { + doAnswer(emptyQueryResponse()) + .when(firestoreMock) + .streamRequest( + runQueryCapture.capture(), + streamObserverCapture.capture(), + Matchers.any()); + + final int[] callCount = {0}; + final BulkWriter bulkWriter = firestoreMock.bulkWriter(); + doAnswer( + new Answer() { + public BulkWriter answer(InvocationOnMock mock) throws Throwable { + callCount[0]++; + return bulkWriter; + } + }) + .when(firestoreMock) + .bulkWriter(); + + firestoreMock.recursiveDelete(firestoreMock.collection("foo")).get(); + firestoreMock.recursiveDelete(firestoreMock.collection("boo")).get(); + firestoreMock.recursiveDelete(firestoreMock.collection("moo")).get(); + + // Only the first recursiveDelete() call should have called the constructor. Subsequent calls + // should have used the same bulkWriter. + assertEquals(1, callCount[0]); + } + + @Test + public void throwsErrorIfBulkWriterInstanceIsClosed() throws Exception { + bulkWriter = firestoreMock.bulkWriter(); + bulkWriter.close(); + try { + firestoreMock.recursiveDelete(firestoreMock.collection("foo"), bulkWriter); + fail("Operation should have failed in test"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("BulkWriter has already been closed")); + } + } +} diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 11e53804c..da94de92a 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -41,6 +41,8 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.Timestamp; +import com.google.cloud.firestore.BulkWriter; +import com.google.cloud.firestore.BulkWriter.WriteResultCallback; import com.google.cloud.firestore.CollectionReference; import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentSnapshot; @@ -1686,4 +1688,110 @@ public void testBuildingBundleWithLimitToLastQuery() throws Exception { randomColl.document("doc2").get().get(), limitToLastQuerySnap.getReadTime().toProto()); } + + private int countDocumentChildren(DocumentReference reference) { + int count = 0; + Iterable collections = reference.listCollections(); + for (CollectionReference collectionReference : collections) { + count += countCollectionChildren(collectionReference); + } + return count; + } + + private int countCollectionChildren(CollectionReference reference) { + int count = 0; + Iterable documents = reference.listDocuments(); + for (DocumentReference documentReference : documents) { + count += countDocumentChildren(documentReference) + 1; + } + return count; + } + + private void setupRecursiveDeleteTest() throws Exception { + // ROOT-DB + // └── randomCol + // ├── anna + // └── bob + // └── parentsCol + // ├── charlie + // └── daniel + // └── childCol + // ├── ernie + // └── francis + WriteBatch batch = firestore.batch(); + batch.set(randomColl.document("anna"), map("name", "anna")); + batch.set(randomColl.document("bob"), map("name", "bob")); + batch.set(randomColl.document("bob/parentsCol/charlie"), map("name", "charlie")); + batch.set(randomColl.document("bob/parentsCol/daniel"), map("name", "daniel")); + batch.set(randomColl.document("bob/parentsCol/daniel/childCol/ernie"), map("name", "ernie")); + batch.set( + randomColl.document("bob/parentsCol/daniel/childCol/francis"), map("name", "francis")); + batch.commit().get(); + } + + @Test + public void testRecursiveDeleteTopLevelCollection() throws Exception { + setupRecursiveDeleteTest(); + firestore.recursiveDelete(randomColl).get(); + assertEquals(0, countCollectionChildren(randomColl)); + } + + @Test + public void testRecursiveDeleteNestedCollection() throws Exception { + setupRecursiveDeleteTest(); + firestore.recursiveDelete(randomColl.document("bob").collection("parentsCol")).get(); + assertEquals(2, countCollectionChildren(randomColl)); + } + + @Test + public void testRecursiveDeleteNestedDocument() throws Exception { + setupRecursiveDeleteTest(); + DocumentReference document = randomColl.document("bob/parentsCol/daniel"); + firestore.recursiveDelete(document).get(); + DocumentSnapshot snap = document.get().get(); + assertFalse(snap.exists()); + assertEquals(1, countDocumentChildren(randomColl.document("bob"))); + assertEquals(3, countCollectionChildren(randomColl)); + } + + @Test + public void testRecursiveDeleteLeafDocument() throws Exception { + setupRecursiveDeleteTest(); + DocumentReference document = randomColl.document("bob/parentsCol/daniel/childCol/ernie"); + firestore.recursiveDelete(document).get(); + DocumentSnapshot snap = document.get().get(); + assertFalse(snap.exists()); + assertEquals(5, countCollectionChildren(randomColl)); + } + + @Test + public void testRecursiveDeleteDoesNotAffectOtherCollections() throws Exception { + setupRecursiveDeleteTest(); + + // Add another nested collection that shouldn't be deleted. + CollectionReference collectionB = firestore.collection("doggos"); + collectionB.document("doggo").set(map("name", "goodboi")).get(); + + firestore.recursiveDelete(collectionB).get(); + assertEquals(6, countCollectionChildren(randomColl)); + assertEquals(0, countCollectionChildren(collectionB)); + } + + @Test + public void testRecursiveDeleteWithCustomBulkWriterInstance() throws Exception { + setupRecursiveDeleteTest(); + + BulkWriter bulkWriter = firestore.bulkWriter(); + final int[] callbackCount = {0}; + bulkWriter.addWriteResultListener( + new WriteResultCallback() { + public void onResult(DocumentReference documentReference, WriteResult result) { + callbackCount[0]++; + } + }); + + firestore.recursiveDelete(randomColl, bulkWriter).get(); + assertEquals(0, countCollectionChildren(randomColl)); + assertEquals(6, callbackCount[0]); + } }