Skip to content

Commit

Permalink
fix: recursive delete: backporting changes from Java (#1514)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed May 26, 2021
1 parent 245c3a9 commit 92ea651
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 39 deletions.
37 changes: 35 additions & 2 deletions dev/src/index.ts
Expand Up @@ -76,7 +76,11 @@ const serviceConfig = interfaces['google.firestore.v1.Firestore'];

import api = google.firestore.v1;
import {CollectionGroup} from './collection-group';
import {RecursiveDelete} from './recursive-delete';
import {
RECURSIVE_DELETE_MAX_PENDING_OPS,
RECURSIVE_DELETE_MIN_PENDING_OPS,
RecursiveDelete,
} from './recursive-delete';

export {
CollectionReference,
Expand Down Expand Up @@ -1250,9 +1254,38 @@ export class Firestore implements firestore.Firestore {
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>,
bulkWriter?: BulkWriter
): Promise<void> {
return this._recursiveDelete(
ref,
RECURSIVE_DELETE_MAX_PENDING_OPS,
RECURSIVE_DELETE_MIN_PENDING_OPS,
bulkWriter
);
}

/**
* This overload is not private in order to test the query resumption with
* startAfter() once the RecursiveDelete instance has MAX_PENDING_OPS pending.
*
* @private
*/
// Visible for testing
_recursiveDelete(
ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>,
maxPendingOps: number,
minPendingOps: number,
bulkWriter?: BulkWriter
): Promise<void> {
const writer = bulkWriter ?? this.getBulkWriter();
const deleter = new RecursiveDelete(this, writer, ref);
const deleter = new RecursiveDelete(
this,
writer,
ref,
maxPendingOps,
minPendingOps
);
return deleter.run();
}

Expand Down
54 changes: 37 additions & 17 deletions dev/src/recursive-delete.ts
Expand Up @@ -48,7 +48,7 @@ export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__';
* from streaming documents faster than Firestore can delete.
*/
// Visible for testing.
export const MAX_PENDING_OPS = 5000;
export const RECURSIVE_DELETE_MAX_PENDING_OPS = 5000;

/**
* The number of pending BulkWriter operations at which RecursiveDelete
Expand All @@ -57,7 +57,7 @@ export const MAX_PENDING_OPS = 5000;
* throughput. This helps prevent BulkWriter from idling while Firestore
* fetches the next query.
*/
const MIN_PENDING_OPS = 1000;
export const RECURSIVE_DELETE_MIN_PENDING_OPS = 1000;

/**
* Class used to store state required for running a recursive delete operation.
Expand All @@ -84,6 +84,25 @@ export class RecursiveDelete {
*/
private documentsPending = true;

/**
* Whether run() has been called.
* @private
*/
private started = false;

/**
* Query limit to use when fetching all descendants.
* @private
*/
private readonly maxPendingOps: number;

/**
* The number of pending BulkWriter operations at which RecursiveDelete
* starts the next limit query to fetch descendants.
* @private
*/
private readonly minPendingOps: number;

/**
* A deferred promise that resolves when the recursive delete operation
* is completed.
Expand Down Expand Up @@ -119,25 +138,30 @@ export class RecursiveDelete {
* @param firestore The Firestore instance to use.
* @param writer The BulkWriter instance to use for delete operations.
* @param ref The document or collection reference to recursively delete.
* @param maxLimit The query limit to use when fetching descendants
* @param minLimit The number of pending BulkWriter operations at which
* RecursiveDelete starts the next limit query to fetch descendants.
*/
constructor(
private readonly firestore: Firestore,
private readonly writer: BulkWriter,
private readonly ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>
) {}
| firestore.DocumentReference<unknown>,
private readonly maxLimit: number,
private readonly minLimit: number
) {
this.maxPendingOps = maxLimit;
this.minPendingOps = minLimit;
}

/**
* Recursively deletes the reference provided in the class constructor.
* Returns a promise that resolves when all descendants have been deleted, or
* if an error occurs.
*/
run(): Promise<void> {
assert(
this.documentsPending,
'The recursive delete operation has already been completed.'
);
assert(!this.started, 'RecursiveDelete.run() should only be called once.');

// Capture the error stack to preserve stack tracing across async calls.
this.errorStack = Error().stack!;
Expand All @@ -152,12 +176,10 @@ export class RecursiveDelete {
* @private
*/
private setupStream(): void {
const limit = MAX_PENDING_OPS;
const stream = this.getAllDescendants(
this.ref instanceof CollectionReference
? (this.ref as CollectionReference<unknown>)
: (this.ref as DocumentReference<unknown>),
limit
: (this.ref as DocumentReference<unknown>)
);
this.streamInProgress = true;
let streamedDocsCount = 0;
Expand All @@ -177,7 +199,7 @@ export class RecursiveDelete {
this.streamInProgress = false;
// If there are fewer than the number of documents specified in the
// limit() field, we know that the query is complete.
if (streamedDocsCount < limit) {
if (streamedDocsCount < this.minPendingOps) {
this.onQueryEnd();
} else if (this.pendingOpsCount === 0) {
this.setupStream();
Expand All @@ -188,13 +210,11 @@ export class RecursiveDelete {
/**
* Retrieves all descendant documents nested under the provided reference.
* @param ref The reference to fetch all descendants for.
* @param limit The number of descendants to fetch in the query.
* @private
* @return {Stream<QueryDocumentSnapshot>} Stream of descendant documents.
*/
private getAllDescendants(
ref: CollectionReference<unknown> | DocumentReference<unknown>,
limit: number
ref: CollectionReference<unknown> | DocumentReference<unknown>
): NodeJS.ReadableStream {
// The parent is the closest ancestor document to the location we're
// deleting. If we are deleting a document, the parent is the path of that
Expand All @@ -220,7 +240,7 @@ export class RecursiveDelete {
);

// Query for names only to fetch empty snapshots.
query = query.select(FieldPath.documentId()).limit(limit);
query = query.select(FieldPath.documentId()).limit(this.maxPendingOps);

if (ref instanceof CollectionReference) {
// To find all descendants of a collection reference, we need to use a
Expand Down Expand Up @@ -300,7 +320,7 @@ export class RecursiveDelete {
if (
this.documentsPending &&
!this.streamInProgress &&
this.pendingOpsCount < MIN_PENDING_OPS
this.pendingOpsCount < this.minPendingOps
) {
this.setupStream();
}
Expand Down
86 changes: 66 additions & 20 deletions dev/test/recursive-delete.ts
Expand Up @@ -50,7 +50,11 @@ import {
import {MAX_REQUEST_RETRIES} from '../src';

import api = google.firestore.v1;
import {MAX_PENDING_OPS, REFERENCE_NAME_MIN_ID} from '../src/recursive-delete';
import {
RECURSIVE_DELETE_MAX_PENDING_OPS,
REFERENCE_NAME_MIN_ID,
} from '../src/recursive-delete';
import {Deferred} from '../src/util';

const PROJECT_ID = 'test-project';
const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`;
Expand Down Expand Up @@ -140,7 +144,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand All @@ -165,7 +169,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root/doc/nestedCol')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand All @@ -184,7 +188,7 @@ describe('recursiveDelete() method:', () => {
'root/doc',
select('__name__'),
allDescendants(/* kindless= */ true),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand Down Expand Up @@ -222,7 +226,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
}
Expand All @@ -235,8 +239,32 @@ describe('recursiveDelete() method:', () => {
});

it('creates a second query with the correct startAfter', async () => {
const firstStream = Array.from(Array(MAX_PENDING_OPS).keys()).map(
(_, i) => result('doc' + i)
// This test checks that the second query is created with the correct
// startAfter() once the RecursiveDelete instance is below the
// MIN_PENDING_OPS threshold to send the next batch. Use lower limits
// than the actual RecursiveDelete class in order to make this test run fast.
const maxPendingOps = 100;
const minPendingOps = 11;
const maxBatchSize = 10;
const cutoff = maxPendingOps - minPendingOps;
let numDeletesBuffered = 0;

// This deferred promise is used to delay the BatchWriteResponses from
// returning in order to create the situation where the number of pending
// operations is less than `minPendingOps`.
const bufferDeferred = new Deferred<void>();

// This deferred completes when the second query is run.
const secondQueryDeferred = new Deferred<void>();

const nLengthArray = (n: number): number[] => Array.from(Array(n).keys());

const firstStream = nLengthArray(maxPendingOps).map((_, i) =>
result('doc' + i)
);

const batchWriteResponse = mergeResponses(
nLengthArray(maxBatchSize).map(() => successResponse(1))
);

// Use an array to store that the queryEquals() method succeeded, since
Expand All @@ -257,7 +285,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(maxPendingOps)
);
called.push(1);
return stream(...firstStream);
Expand All @@ -279,34 +307,52 @@ describe('recursiveDelete() method:', () => {
referenceValue:
`projects/${PROJECT_ID}/databases/(default)/` +
'documents/collectionId/doc' +
(MAX_PENDING_OPS - 1),
(maxPendingOps - 1),
}),
limit(MAX_PENDING_OPS)
limit(maxPendingOps)
);
called.push(2);
secondQueryDeferred.resolve();
return stream();
} else {
called.push(3);
return stream();
}
},
batchWrite: () => {
const responses = mergeResponses(
Array.from(Array(500).keys()).map(() => successResponse(1))
);
return response({
writeResults: responses.writeResults,
status: responses.status,
const returnedResponse = response({
writeResults: batchWriteResponse.writeResults,
status: batchWriteResponse.status,
});
if (numDeletesBuffered < cutoff) {
numDeletesBuffered += batchWriteResponse.writeResults!.length;

// By waiting for `bufferFuture` to complete, we can guarantee that
// the writes complete after all documents are streamed. Without
// this future, the test can race and complete the writes before
// the stream is finished, which is a different scenario this test
// is not for.
return bufferDeferred.promise.then(() => returnedResponse);
} else {
// Once there are `cutoff` pending deletes, completing the future
// allows enough responses to be returned such that the number of
// pending deletes should be less than `minPendingOps`. This allows
// us to test that the second query is made.
bufferDeferred.resolve();
return secondQueryDeferred.promise.then(() => returnedResponse);
}
},
};
const firestore = await createInstance(overrides);

// Use a custom batch size with BulkWriter to simplify the dummy
// batchWrite() response logic.
const bulkWriter = firestore.bulkWriter();
bulkWriter._maxBatchSize = 500;
await firestore.recursiveDelete(firestore.collection('root'), bulkWriter);
bulkWriter._maxBatchSize = maxBatchSize;
await firestore._recursiveDelete(
firestore.collection('root'),
maxPendingOps,
minPendingOps,
bulkWriter
);
expect(called).to.deep.equal([1, 2]);
});
});
Expand Down

0 comments on commit 92ea651

Please sign in to comment.