Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add recursiveDelete() to Firestore #622

Merged
merged 10 commits into from May 24, 2021
Merged

Conversation

thebrianchen
Copy link

@thebrianchen thebrianchen commented May 5, 2021

Porting from node. Merging to a feature branch until it's good to release.

Several PRs are contained in here: 1, 2, 3.

@thebrianchen thebrianchen requested a review from a team as a code owner May 5, 2021 23:35
@thebrianchen thebrianchen requested a review from a team May 5, 2021 23:35
@thebrianchen thebrianchen requested a review from a team as a code owner May 5, 2021 23:35
@product-auto-label product-auto-label bot added the api: firestore Issues related to the googleapis/java-firestore API. label May 5, 2021
@thebrianchen thebrianchen self-assigned this May 5, 2021
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label May 5, 2021
Preconditions.checkState(
documentsPending, "The recursive delete operation has already been completed");

// TODO: figure out if this is safe, or a way to lock this.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to expose the lock in BulkWriter and synchronize it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to add a verifyNotClosed that calls verifyNotClosedLocked.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for the suggestion.

@codecov
Copy link

codecov bot commented May 5, 2021

Codecov Report

❗ No coverage uploaded for pull request base (bc/rc-main@ac3db81). Click here to learn what that means.
The diff coverage is n/a.

❗ Current head 7862601 differs from pull request most recent head 47876e8. Consider uploading reports for the commit 47876e8 to get more accurate results
Impacted file tree graph

@@              Coverage Diff              @@
##             bc/rc-main     #622   +/-   ##
=============================================
  Coverage              ?   74.59%           
  Complexity            ?     1124           
=============================================
  Files                 ?       68           
  Lines                 ?     6057           
  Branches              ?      737           
=============================================
  Hits                  ?     4518           
  Misses                ?     1309           
  Partials              ?      230           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ac3db81...47876e8. Read the comment docs.

Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test code review coming this afternoon.

*/
@Nonnull
@VisibleForTesting
public ApiFuture<Void> recursiveDelete(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be public.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.


public void onError(Throwable throwable) {
lastError =
new FirestoreException("Failed to fetch children documents.", Status.UNAVAILABLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This swallows the underlying exception.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped the old exception. The status should be UNAVAILABLE regardless of the error, as it is on Node.

query = query.select(FieldPath.documentId()).limit(maxPendingOps);

return query;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should find a way to combine all three methods (by passing ResourcePath and collectionId at each callsite)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Delete the provided document reference if one was provided.
if (documentReference != null) {
ApiFuture<Void> voidDeleteFuture =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call deleteReference? (Not sure)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this case, since deleteReference() doesn't return the ApiFuture, and we need it here to chain with flushFuture.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make it return an ApiFuture.

private ApiFuture<Void> deleteReference(final DocumentReference reference) {
    synchronized (lock) {
      pendingOperationsCount++;
    }
    
    return ApiFutures.transformAsync(
          catchingDelete(reference),
          new ApiAsyncFunction<WriteResult, Void>() {

            public ApiFuture<Void> apply(WriteResult result) {
              synchronized (lock) {
                pendingOperationsCount--;
                // We wait until the previous stream has ended in order to ensure the
                // startAfter document is correct. Starting the next stream while
                // there are pending operations allows Firestore to maximize
                // BulkWriter throughput.
                if (documentsPending
                    && !streamInProgress
                    && pendingOperationsCount < minPendingOps) {
                  streamDescendants();
                }
                return ApiFutures.immediateFuture(null);
              }
            }
          },
          MoreExecutors.directExecutor());
    }
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you mean now, thanks. Done.

new ApiAsyncFunction<WriteResult, Object>() {

public ApiFuture<Object> apply(WriteResult result) {
pendingOperationsCount--;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If BulkWriter sends two requests in parallel, this code might be invoked from two different Gax threads. I think this needs synchronization (same with the error handler).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a lock here and elsewhere that access lastError, streamInProgress, documentsPending, and errorCount.

ApiFutures.allAsList(pendingFutures),
new ApiAsyncFunction<List<Void>, Void>() {
public ApiFuture<Void> apply(List<Void> unused) {
if (lastError == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to run on the same thread that sets lastError?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure which executor gets used when transforming ApiFutures.allAsList(), but if we assume it can be either of the two futures in the list, we should lock it. Added a lock.

Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

Matchers.<ServerStreamingCallable>any());

// Include dummy response for the deleted reference.
doAnswer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just use a collection delete.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted the comment (copied from another test). The test is using a collection delete, and the batchWrite mock is to delete the document that is streamed (a document needs to be returned for the stream to retry).

final List<Answer<RunQueryResponse>> runQueryResponses =
new ArrayList<>(
Arrays.asList(
queryResponse(mockException, fullDocumentPath("a"), fullDocumentPath("b")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the exception be the last response rather than the first?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test checks that the stream retries work properly (streamed documents are added to the BulkWriter even if the stream is choppy), so we want the exception to be thrown from the start. The last response is a success response to stop the retry attempts.


firestore.recursiveDelete(collectionB).get();
assertEquals(6, countCollectionChildren(randomColl));
assertEquals(0, countCollectionChildren(collectionB));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there not still be one document?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. We created a separate collection (collectionB) with only a single document, deleted that document, and verified that the collection with all the people wasn't affected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I am glad you made me look at this again though - I think you should wait for "doggo" to be written before invoking the delete.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, done.

assertTrue(e.getMessage().contains("BulkWriter has already been closed"));
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it will be pretty tough to test but we do not have a test that tests that we send the second request when we pass the MIN_PENDING_OPS threshold.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified createsSecondQueryWithCorrectStartAfter() to test this.

@schmidt-sebastian schmidt-sebastian removed their assignment May 7, 2021
Brian Chen and others added 2 commits May 7, 2021 14:46
Co-authored-by: Sebastian Schmidt <mrschmidt@google.com>
Copy link
Author

@thebrianchen thebrianchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review!

*/
@Nonnull
@VisibleForTesting
public ApiFuture<Void> recursiveDelete(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

@@ -255,13 +255,24 @@ public boolean equals(Object o) {

abstract ImmutableList<FieldReference> getFieldProjections();

// Whether to select all documents under `parentPath`. By default, only
// collections that match `collectionId` are selected.
abstract boolean getKindless();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can have kindness without being kindless!

// Whether to require consistent documents when restarting the query. By
// default, restarting the query uses the readTime offset of the original
// query to provide consistent results.
abstract boolean getRequireConsistency();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesRequireConsistency sounds weird. I'll keep it as is to match the node implementation which is requireConsistency.


public ApiFuture<Void> run() {
Preconditions.checkState(
documentsPending, "The recursive delete operation has already been completed");
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, added an invoked boolean.

Preconditions.checkState(
documentsPending, "The recursive delete operation has already been completed");

// TODO: figure out if this is safe, or a way to lock this.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for the suggestion.


firestore.recursiveDelete(collectionB).get();
assertEquals(6, countCollectionChildren(randomColl));
assertEquals(0, countCollectionChildren(collectionB));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. We created a separate collection (collectionB) with only a single document, deleted that document, and verified that the collection with all the people wasn't affected.

assertTrue(e.getMessage().contains("BulkWriter has already been closed"));
}
}
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified createsSecondQueryWithCorrectStartAfter() to test this.


public void onError(Throwable throwable) {
lastError =
new FirestoreException("Failed to fetch children documents.", Status.UNAVAILABLE);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped the old exception. The status should be UNAVAILABLE regardless of the error, as it is on Node.

new ApiAsyncFunction<WriteResult, Object>() {

public ApiFuture<Object> apply(WriteResult result) {
pendingOperationsCount--;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a lock here and elsewhere that access lastError, streamInProgress, documentsPending, and errorCount.

ApiFutures.allAsList(pendingFutures),
new ApiAsyncFunction<List<Void>, Void>() {
public ApiFuture<Void> apply(List<Void> unused) {
if (lastError == null) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure which executor gets used when transforming ApiFutures.allAsList(), but if we assume it can be either of the two futures in the list, we should lock it. Added a lock.

@VisibleForTesting
ApiFuture<Void> recursiveDelete(
CollectionReference reference, BulkWriter bulkWriter, int maxLimit, int minLimit) {
RecursiveDelete deleter = new RecursiveDelete(this, bulkWriter, reference, maxLimit, minLimit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code flow would be a bit cleaner if you called this method from all overloads. This will ensue that all calls go through the same callsite. If you take parentPath and collectionId as input here then you will also remove some duplication in RecursiveDelete (as you only need one constructor).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. RecursiveDelete only has one constructor now.

* The number of pending BulkWriter operations at which RecursiveDelete starts the next limit
* query to fetch descendants.
*/
private int minPendingOps = MIN_PENDING_OPS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be final now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private boolean invoked = false;

/** Query limit to use when fetching all descendants. */
private int maxPendingOps = MAX_PENDING_OPS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be final now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private boolean streamInProgress = false;

/** Whether run() has been called. */
private boolean invoked = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

started?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private final FirestoreRpcContext<?> firestoreRpcContext;
private final BulkWriter writer;

@Nullable private final DocumentReference documentReference;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be cleaner to replace this with a documentId string. Then you would have parentPath, collectionId and documentId below. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think trying to rehydrate the DocumentReference from the documentId is more bug-prone, since we'd be setting it to an empty string for collections, and the document path for documents.

I renamed documentReference to rootDocumentReference and added documentation. This way, we don't have to add extra code to rehydrate, and it's pretty clear to see the purpose of the variable at a glance.


public void onCompleted() {
synchronized (lock) {
streamInProgress = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably reduce the scope of the lock to just this line (also for the onError callback, which only needs to set lastError under lock). This improves readability as it makes the intention more clear and also answer the question why onNext does not use locks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


firestore.recursiveDelete(collectionB).get();
assertEquals(6, countCollectionChildren(randomColl));
assertEquals(0, countCollectionChildren(collectionB));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I am glad you made me look at this again though - I think you should wait for "doggo" to be written before invoking the delete.

if (numDeletesBuffered[0] < cutoff) {
numDeletesBuffered[0] += batchWriteResponse.size();
return ApiFutures.transformAsync(
bufferFuture,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why we need to block on bufferFuture here. I thought the idea was that we should finish the first 4000 deletes and then ask for the next batch. The current test seems to indicate that we do not wait for the deletes to finish executing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had bufferFuture to ensure that the stream completes before batchWrite responses do in the tests. Without the bufferFuture logic, we run into a race condition where the first 4000 writes complete before the stream is able to process all the documents. Since the test waits for secondQueryFuture before completing the remaining writes, the test gets stuck.

Here's what is happening broken down (using the normal max/min pending ops parameters, rather than the simplified test params):

Intended order of events:

  1. Stream 5000 documents.
  2. BulkWriter mock completes 4000 documents, The delete completion handler detects we are under the minPendingOps threshold to start a new stream.
  3. A second query streamed (testing startAfter), completing secondQueryFuture.
  4. The remaining 1000 batchWrites are completed, ending the test.

Race condition order without bufferFuture:

  1. Stream 4XXXX documents.
  2. BulkWriter mock completes 4000 documents. However, since the stream is still in progress, a new stream is not started.
  3. The stream completes, but since we are waiting on the remaining 1000 pending operations, we do not start the next stream.
  4. Test stalls because the last 1000 mock writes are waiting for the second query to be run first to complete `secondQueryFuture. (Normally, the next batch isn't waiting on a future, and would trigger the next stream)

tl:dr; Delete completions are still being waited on, bufferFuture is just here to make sure that the stream completes first in this kinda hacky test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining this. I am pretty sure I understand this now. If you can add a small comment in the test I might even still understand tomorrow (optional).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.


// Delete the provided document reference if one was provided.
if (documentReference != null) {
ApiFuture<Void> voidDeleteFuture =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make it return an ApiFuture.

private ApiFuture<Void> deleteReference(final DocumentReference reference) {
    synchronized (lock) {
      pendingOperationsCount++;
    }
    
    return ApiFutures.transformAsync(
          catchingDelete(reference),
          new ApiAsyncFunction<WriteResult, Void>() {

            public ApiFuture<Void> apply(WriteResult result) {
              synchronized (lock) {
                pendingOperationsCount--;
                // We wait until the previous stream has ended in order to ensure the
                // startAfter document is correct. Starting the next stream while
                // there are pending operations allows Firestore to maximize
                // BulkWriter throughput.
                if (documentsPending
                    && !streamInProgress
                    && pendingOperationsCount < minPendingOps) {
                  streamDescendants();
                }
                return ApiFutures.immediateFuture(null);
              }
            }
          },
          MoreExecutors.directExecutor());
    }
  }

private boolean documentsPending = true;

/** A deferred promise that resolves when the recursive delete operation is completed. */
private final SettableApiFuture<Void> completionFuture = SettableApiFuture.create();

/** Whether a query stream is currently in progress. Only one stream can be run at a time. */
@GuardedBy("lock")
private boolean streamInProgress = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to combine streamInProgress and documentsPending with some ! magic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Played around with a bit, but I couldn't think of a way to do this cleanly with good readability.

onQueryEnd();
synchronized (lock) {
String message = "Failed to fetch children documents. " + throwable.getMessage();
lastError = new FirestoreException(message, Status.UNAVAILABLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java exception "bubbling" usually involves passing the original exception as the cause to the wrapping exception.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to include the throwable and FirestoreExceptoin.forServerRejection().

// deleting a document, the parent is the path of that document.
parentPath = path;
collectionId = path.getParent().getId();
Preconditions.checkState(collectionId != null, "Parent of a document should not be null.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would drop this check - every document has a collection ID. If you do keep it, the check should be on getParent(), which is nullable and not on getId().

BTW, I really like this cleanup.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed check to parent.

I definitely like this version more, thanks for pushing through on your vision :) Having all the collectionId/parentPath logic inside getAllDescendantsQuery() makes so more sense, and makes the rest of the code much cleaner!

if (numDeletesBuffered[0] < cutoff) {
numDeletesBuffered[0] += batchWriteResponse.size();
return ApiFutures.transformAsync(
bufferFuture,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining this. I am pretty sure I understand this now. If you can add a small comment in the test I might even still understand tomorrow (optional).

@thebrianchen thebrianchen merged commit 7564620 into bc/rc-main May 24, 2021
@thebrianchen thebrianchen deleted the bc/rc-base branch May 24, 2021 20:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: firestore Issues related to the googleapis/java-firestore API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants