Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: recursive delete: backporting changes from Java #1514

Merged
merged 3 commits into from May 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 35 additions & 2 deletions dev/src/index.ts
Expand Up @@ -76,7 +76,11 @@ const serviceConfig = interfaces['google.firestore.v1.Firestore'];

import api = google.firestore.v1;
import {CollectionGroup} from './collection-group';
import {RecursiveDelete} from './recursive-delete';
import {
RECURSIVE_DELETE_MAX_PENDING_OPS,
RECURSIVE_DELETE_MIN_PENDING_OPS,
RecursiveDelete,
} from './recursive-delete';

export {
CollectionReference,
Expand Down Expand Up @@ -1250,9 +1254,38 @@ export class Firestore implements firestore.Firestore {
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>,
bulkWriter?: BulkWriter
): Promise<void> {
return this._recursiveDelete(
ref,
RECURSIVE_DELETE_MAX_PENDING_OPS,
RECURSIVE_DELETE_MIN_PENDING_OPS,
bulkWriter
);
}

/**
* This overload is not private in order to test the query resumption with
* startAfter() once the RecursiveDelete instance has MAX_PENDING_OPS pending.
*
* @private
*/
// Visible for testing
_recursiveDelete(
ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>,
maxPendingOps: number,
minPendingOps: number,
bulkWriter?: BulkWriter
): Promise<void> {
const writer = bulkWriter ?? this.getBulkWriter();
const deleter = new RecursiveDelete(this, writer, ref);
const deleter = new RecursiveDelete(
this,
writer,
ref,
maxPendingOps,
minPendingOps
);
return deleter.run();
}

Expand Down
50 changes: 33 additions & 17 deletions dev/src/recursive-delete.ts
Expand Up @@ -48,7 +48,7 @@ export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__';
* from streaming documents faster than Firestore can delete.
*/
// Visible for testing.
export const MAX_PENDING_OPS = 5000;
export const RECURSIVE_DELETE_MAX_PENDING_OPS = 5000;

/**
* The number of pending BulkWriter operations at which RecursiveDelete
Expand All @@ -57,7 +57,7 @@ export const MAX_PENDING_OPS = 5000;
* throughput. This helps prevent BulkWriter from idling while Firestore
* fetches the next query.
*/
const MIN_PENDING_OPS = 1000;
export const RECURSIVE_DELETE_MIN_PENDING_OPS = 1000;

/**
* Class used to store state required for running a recursive delete operation.
Expand All @@ -84,6 +84,21 @@ export class RecursiveDelete {
*/
private documentsPending = true;

/**
* Whether run() has been called.
* @private
*/
private started = false;

/** Query limit to use when fetching all descendants. */
private maxPendingOps: number;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be marked "private" (here and below)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching, done.


/**
* The number of pending BulkWriter operations at which RecursiveDelete starts the next limit
* query to fetch descendants.
*/
private minPendingOps: number;

/**
* A deferred promise that resolves when the recursive delete operation
* is completed.
Expand Down Expand Up @@ -119,25 +134,30 @@ export class RecursiveDelete {
* @param firestore The Firestore instance to use.
* @param writer The BulkWriter instance to use for delete operations.
* @param ref The document or collection reference to recursively delete.
* @param maxLimit The query limit to use when fetching descendants
* @param minLimit The number of pending BulkWriter operations at which
* RecursiveDelete starts the next limit query to fetch descendants.
*/
constructor(
private readonly firestore: Firestore,
private readonly writer: BulkWriter,
private readonly ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>
) {}
| firestore.DocumentReference<unknown>,
private readonly maxLimit: number,
private readonly minLimit: number
) {
this.maxPendingOps = maxLimit;
this.minPendingOps = minLimit;
}

/**
* Recursively deletes the reference provided in the class constructor.
* Returns a promise that resolves when all descendants have been deleted, or
* if an error occurs.
*/
run(): Promise<void> {
assert(
this.documentsPending,
'The recursive delete operation has already been completed.'
);
assert(!this.started, 'RecursiveDelete.run() should only be called once.');

// Capture the error stack to preserve stack tracing across async calls.
this.errorStack = Error().stack!;
Expand All @@ -152,12 +172,10 @@ export class RecursiveDelete {
* @private
*/
private setupStream(): void {
const limit = MAX_PENDING_OPS;
const stream = this.getAllDescendants(
this.ref instanceof CollectionReference
? (this.ref as CollectionReference<unknown>)
: (this.ref as DocumentReference<unknown>),
limit
: (this.ref as DocumentReference<unknown>)
);
this.streamInProgress = true;
let streamedDocsCount = 0;
Expand All @@ -177,7 +195,7 @@ export class RecursiveDelete {
this.streamInProgress = false;
// If there are fewer than the number of documents specified in the
// limit() field, we know that the query is complete.
if (streamedDocsCount < limit) {
if (streamedDocsCount < this.minPendingOps) {
this.onQueryEnd();
} else if (this.pendingOpsCount === 0) {
this.setupStream();
Expand All @@ -188,13 +206,11 @@ export class RecursiveDelete {
/**
* Retrieves all descendant documents nested under the provided reference.
* @param ref The reference to fetch all descendants for.
* @param limit The number of descendants to fetch in the query.
* @private
* @return {Stream<QueryDocumentSnapshot>} Stream of descendant documents.
*/
private getAllDescendants(
ref: CollectionReference<unknown> | DocumentReference<unknown>,
limit: number
ref: CollectionReference<unknown> | DocumentReference<unknown>
): NodeJS.ReadableStream {
// The parent is the closest ancestor document to the location we're
// deleting. If we are deleting a document, the parent is the path of that
Expand All @@ -220,7 +236,7 @@ export class RecursiveDelete {
);

// Query for names only to fetch empty snapshots.
query = query.select(FieldPath.documentId()).limit(limit);
query = query.select(FieldPath.documentId()).limit(this.maxPendingOps);

if (ref instanceof CollectionReference) {
// To find all descendants of a collection reference, we need to use a
Expand Down Expand Up @@ -300,7 +316,7 @@ export class RecursiveDelete {
if (
this.documentsPending &&
!this.streamInProgress &&
this.pendingOpsCount < MIN_PENDING_OPS
this.pendingOpsCount < this.minPendingOps
) {
this.setupStream();
}
Expand Down
84 changes: 64 additions & 20 deletions dev/test/recursive-delete.ts
Expand Up @@ -50,7 +50,11 @@ import {
import {MAX_REQUEST_RETRIES} from '../src';

import api = google.firestore.v1;
import {MAX_PENDING_OPS, REFERENCE_NAME_MIN_ID} from '../src/recursive-delete';
import {
RECURSIVE_DELETE_MAX_PENDING_OPS,
REFERENCE_NAME_MIN_ID,
} from '../src/recursive-delete';
import {Deferred} from '../src/util';

const PROJECT_ID = 'test-project';
const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`;
Expand Down Expand Up @@ -140,7 +144,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand All @@ -165,7 +169,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root/doc/nestedCol')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand All @@ -184,7 +188,7 @@ describe('recursiveDelete() method:', () => {
'root/doc',
select('__name__'),
allDescendants(/* kindless= */ true),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand Down Expand Up @@ -222,7 +226,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
}
Expand All @@ -235,8 +239,30 @@ describe('recursiveDelete() method:', () => {
});

it('creates a second query with the correct startAfter', async () => {
const firstStream = Array.from(Array(MAX_PENDING_OPS).keys()).map(
(_, i) => result('doc' + i)
// This test checks that the second query is created with the correct
// startAfter() once the RecursiveDelete instance is below the
// MIN_PENDING_OPS threshold to send the next batch. Use lower limits
// than the actual RecursiveDelete class in order to make this test run fast.
const maxPendingOps = 100;
const minPendingOps = 11;
const maxBatchSize = 10;
const cutoff = maxPendingOps - minPendingOps;
let numDeletesBuffered = 0;

// This deferred promise is used to delay the BatchWriteResponses from
// returning in order to create the situation where the number of pending
// operations is less than `minPendingOps`.
const bufferDeferred = new Deferred<void>();

// This deferred completes when the second query is run.
const secondQueryDeferred = new Deferred<void>();

const firstStream = Array.from(Array(maxPendingOps).keys()).map((_, i) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a helper variable for Array.from(Array(maxPendingOps).keys()) and name it appropriately? It is used twice and not very pretty :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

result('doc' + i)
);

const batchWriteResponse = mergeResponses(
Array.from(Array(maxBatchSize).keys()).map(() => successResponse(1))
);

// Use an array to store that the queryEquals() method succeeded, since
Expand All @@ -257,7 +283,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(maxPendingOps)
);
called.push(1);
return stream(...firstStream);
Expand All @@ -279,34 +305,52 @@ describe('recursiveDelete() method:', () => {
referenceValue:
`projects/${PROJECT_ID}/databases/(default)/` +
'documents/collectionId/doc' +
(MAX_PENDING_OPS - 1),
(maxPendingOps - 1),
}),
limit(MAX_PENDING_OPS)
limit(maxPendingOps)
);
called.push(2);
secondQueryDeferred.resolve();
return stream();
} else {
called.push(3);
return stream();
}
},
batchWrite: () => {
const responses = mergeResponses(
Array.from(Array(500).keys()).map(() => successResponse(1))
);
return response({
writeResults: responses.writeResults,
status: responses.status,
const returnedResponse = response({
writeResults: batchWriteResponse.writeResults,
status: batchWriteResponse.status,
});
if (numDeletesBuffered < cutoff) {
numDeletesBuffered += batchWriteResponse.writeResults!.length;

// By waiting for `bufferFuture` to complete, we can guarantee that
// the writes complete after all documents are streamed. Without
// this future, the test can race and complete the writes before
// the stream is finished, which is a different scenario this test
// is not for.
return bufferDeferred.promise.then(() => returnedResponse);
} else {
// Once there are `cutoff` pending deletes, completing the future
// allows enough responses to be returned such that the number of
// pending deletes should be less than `minPendingOps`. This allows
// us to test that the second query is made.
bufferDeferred.resolve();
return secondQueryDeferred.promise.then(() => returnedResponse);
}
},
};
const firestore = await createInstance(overrides);

// Use a custom batch size with BulkWriter to simplify the dummy
// batchWrite() response logic.
const bulkWriter = firestore.bulkWriter();
bulkWriter._maxBatchSize = 500;
await firestore.recursiveDelete(firestore.collection('root'), bulkWriter);
bulkWriter._maxBatchSize = maxBatchSize;
await firestore._recursiveDelete(
firestore.collection('root'),
maxPendingOps,
minPendingOps,
bulkWriter
);
expect(called).to.deep.equal([1, 2]);
});
});
Expand Down