From 37e93da689f985b6b0f30645435b12179513eb64 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Fri, 27 Dec 2019 10:16:50 +0800 Subject: [PATCH] feat: allow specifying how many idle GRPC channels to keep (#837) --- dev/src/index.ts | 21 +++++++ dev/src/pool.ts | 91 ++++++++++++++++--------------- dev/src/types.ts | 8 +++ dev/test/index.ts | 13 +++++ dev/test/pool.ts | 115 +++++++++++++++++++++++++++++++++++---- dev/test/typescript.ts | 1 + dev/test/util/helpers.ts | 32 ++++++----- dev/test/watch.ts | 2 +- types/firestore.d.ts | 8 +++ 9 files changed, 221 insertions(+), 70 deletions(-) diff --git a/dev/src/index.ts b/dev/src/index.ts index f5c064137..2f0d7e430 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -130,6 +130,11 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix'; */ const MAX_REQUEST_RETRIES = 5; +/*! + * The default number of idle GRPC channel to keep. + */ +const DEFAULT_MAX_IDLE_CHANNELS = 1; + /*! * The maximum number of concurrent requests supported by a single GRPC channel, * as enforced by Google's Frontend. If the SDK issues more than 100 concurrent @@ -317,6 +322,11 @@ export class Firestore { * can specify a `keyFilename` instead. * @param {string=} settings.host The host to connect to. * @param {boolean=} settings.ssl Whether to use SSL when connecting. + * @param {number=} settings.maxIdleChannels The maximum number of idle GRPC + * channels to keep. A smaller number of idle channels reduces memory usage + * but increases request latency for clients with fluctuating request rates. + * If set to 0, shuts down all GRPC channels when the client becomes idle. + * Defaults to 1. */ constructor(settings?: Settings) { const libraryHeader = { @@ -372,8 +382,13 @@ export class Firestore { logger('Firestore', null, 'Detected GCF environment'); } + const maxIdleChannels = + this._settings.maxIdleChannels === undefined + ? DEFAULT_MAX_IDLE_CHANNELS + : this._settings.maxIdleChannels; this._clientPool = new ClientPool( MAX_CONCURRENT_REQUESTS_PER_CLIENT, + maxIdleChannels, /* clientFactory= */ () => { let client: GapicClient; @@ -455,6 +470,12 @@ export class Firestore { validateBoolean('settings.ssl', settings.ssl); } + if (settings.maxIdleChannels !== undefined) { + validateInteger('settings.maxIdleChannels', settings.maxIdleChannels, { + minValue: 0, + }); + } + this._settings = settings; this._serializer = new Serializer(this); } diff --git a/dev/src/pool.ts b/dev/src/pool.ts index 4010817fa..0c82a435e 100644 --- a/dev/src/pool.ts +++ b/dev/src/pool.ts @@ -44,6 +44,8 @@ export class ClientPool { /** * @param concurrentOperationLimit The number of operations that each client * can handle. + * @param maxIdleClients The maximum number of idle clients to keep before + * garbage collecting. * @param clientFactory A factory function called as needed when new clients * are required. * @param clientDestructor A cleanup function that is called when a client is @@ -51,6 +53,7 @@ export class ClientPool { */ constructor( private readonly concurrentOperationLimit: number, + private readonly maxIdleClients: number, private readonly clientFactory: () => T, private readonly clientDestructor: (client: T) => Promise = () => Promise.resolve() @@ -64,10 +67,16 @@ export class ClientPool { */ private acquire(requestTag: string): T { let selectedClient: T | null = null; - let selectedRequestCount = 0; + let selectedClientRequestCount = 0; - this.activeClients.forEach((requestCount, client) => { - if (!selectedClient && requestCount < this.concurrentOperationLimit) { + for (const [client, requestCount] of this.activeClients) { + // Use the "most-full" client that can still accommodate the request + // in order to maximize the number of idle clients as operations start to + // complete. + if ( + requestCount > selectedClientRequestCount && + requestCount < this.concurrentOperationLimit + ) { logger( 'ClientPool.acquire', requestTag, @@ -75,11 +84,18 @@ export class ClientPool { this.concurrentOperationLimit - requestCount ); selectedClient = client; - selectedRequestCount = requestCount; + selectedClientRequestCount = requestCount; } - }); + } - if (!selectedClient) { + if (selectedClient) { + logger( + 'ClientPool.acquire', + requestTag, + 'Re-using existing client with %s remaining operations', + this.concurrentOperationLimit - selectedClientRequestCount + ); + } else { logger('ClientPool.acquire', requestTag, 'Creating a new client'); selectedClient = this.clientFactory(); assert( @@ -88,7 +104,7 @@ export class ClientPool { ); } - this.activeClients.set(selectedClient, selectedRequestCount + 1); + this.activeClients.set(selectedClient, selectedClientRequestCount + 1); return selectedClient!; } @@ -99,23 +115,34 @@ export class ClientPool { * @private */ private async release(requestTag: string, client: T): Promise { - let requestCount = this.activeClients.get(client) || 0; + const requestCount = this.activeClients.get(client) || 0; assert(requestCount > 0, 'No active request'); + this.activeClients.set(client, requestCount - 1); - requestCount = requestCount! - 1; - this.activeClients.set(client, requestCount); + if (this.shouldGarbageCollectClient(client)) { + this.activeClients.delete(client); + await this.clientDestructor(client); + logger('ClientPool.release', requestTag, 'Garbage collected 1 client'); + } + } - if (requestCount === 0) { - const deletedCount = await this.garbageCollect(); - if (deletedCount) { - logger( - 'ClientPool.release', - requestTag, - 'Garbage collected %s clients', - deletedCount - ); - } + /** + * Given the current operation counts, determines if the given client should + * be garbage collected. + * @private + */ + private shouldGarbageCollectClient(client: T): boolean { + if (this.activeClients.get(client) !== 0) { + return false; + } + + let idleCapacityCount = 0; + for (const [_, count] of this.activeClients) { + idleCapacityCount += this.concurrentOperationLimit - count; } + return ( + idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit + ); } /** @@ -177,28 +204,4 @@ export class ClientPool { await this.clientDestructor(client); } } - - /** - * Deletes clients that are no longer executing operations. Keeps up to one - * idle client to reduce future initialization costs. - * - * @return Number of clients deleted. - * @private - */ - private async garbageCollect(): Promise { - let idleClients = 0; - const cleanUpTasks: Array> = []; - for (const [client, requestCount] of this.activeClients) { - if (requestCount === 0) { - ++idleClients; - - if (idleClients > 1) { - this.activeClients.delete(client); - cleanUpTasks.push(this.clientDestructor(client)); - } - } - } - await Promise.all(cleanUpTasks); - return idleClients - 1; - } } diff --git a/dev/src/types.ts b/dev/src/types.ts index 7085284c4..a70517a2d 100644 --- a/dev/src/types.ts +++ b/dev/src/types.ts @@ -72,6 +72,14 @@ export interface Settings { /** Whether to use SSL when connecting. */ ssl?: boolean; + /** + * The maximum number of idle GRPC channels to keep. A smaller number of idle + * channels reduces memory usage but increases request latency for clients + * with fluctuating request rates. If set to 0, shuts down all GRPC channels + * when the client becomes idle. Defaults to 1. + */ + maxIdleChannels?: number; + // tslint:disable-next-line:no-any [key: string]: any; // Accept other properties, such as GRPC settings. } diff --git a/dev/test/index.ts b/dev/test/index.ts index e43332129..bffc04e7a 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -491,6 +491,19 @@ describe('instantiation', () => { } }); + it('validates maxIdleChannels', () => { + const invalidValues = [-1, 'foo', 1.3]; + + for (const value of invalidValues) { + expect(() => { + const settings = {...DEFAULT_SETTINGS, maxIdleChannels: value}; + new Firestore.Firestore(settings as InvalidApiUsage); + }).to.throw(); + } + + new Firestore.Firestore({maxIdleChannels: 1}); + }); + it('uses project id from constructor', () => { const firestore = new Firestore.Firestore({projectId: 'foo'}); diff --git a/dev/test/pool.ts b/dev/test/pool.ts index cd50ff73b..0ff986a4f 100644 --- a/dev/test/pool.ts +++ b/dev/test/pool.ts @@ -32,7 +32,7 @@ function deferredPromises(count: number): Array> { describe('Client pool', () => { it('creates new instances as needed', () => { - const clientPool = new ClientPool<{}>(3, () => { + const clientPool = new ClientPool<{}>(3, 0, () => { return {}; }); @@ -52,7 +52,7 @@ describe('Client pool', () => { }); it('re-uses idle instances', () => { - const clientPool = new ClientPool<{}>(2, () => { + const clientPool = new ClientPool<{}>(2, 0, () => { return {}; }); @@ -80,8 +80,51 @@ describe('Client pool', () => { }); }); + it('bin packs operations', async () => { + let clientCount = 0; + const clientPool = new ClientPool(2, 0, () => { + return ++clientCount; + }); + + expect(clientPool.size).to.equal(0); + + // Create 5 operations, which should schedule 2 operations on the first + // client, 2 on the second and 1 on the third. + const operationPromises = deferredPromises(7); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(1); + return operationPromises[0].promise; + }); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(1); + return operationPromises[1].promise; + }); + const thirdOperation = clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(2); + return operationPromises[2].promise; + }); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(2); + return operationPromises[3].promise; + }); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(3); + return operationPromises[4].promise; + }); + + // Free one slot on the second client. + operationPromises[2].resolve(); + await thirdOperation; + + // A newly scheduled operation should use the first client that has a free + // slot. + clientPool.run(REQUEST_TAG, async client => { + expect(client).to.be.equal(2); + }); + }); + it('garbage collects after success', () => { - const clientPool = new ClientPool<{}>(2, () => { + const clientPool = new ClientPool<{}>(2, 0, () => { return {}; }); @@ -110,12 +153,12 @@ describe('Client pool', () => { operationPromises.forEach(deferred => deferred.resolve()); return Promise.all(completionPromises).then(() => { - expect(clientPool.size).to.equal(1); + expect(clientPool.size).to.equal(0); }); }); it('garbage collects after error', () => { - const clientPool = new ClientPool<{}>(2, () => { + const clientPool = new ClientPool<{}>(2, 0, () => { return {}; }); @@ -145,7 +188,7 @@ describe('Client pool', () => { return Promise.all(completionPromises.map(p => p.catch(() => {}))).then( () => { - expect(clientPool.size).to.equal(1); + expect(clientPool.size).to.equal(0); } ); }); @@ -155,9 +198,8 @@ describe('Client pool', () => { const clientPool = new ClientPool<{}>( 1, - () => { - return {}; - }, + 0, + () => ({}), () => Promise.resolve(garbageCollect.resolve()) ); @@ -173,7 +215,7 @@ describe('Client pool', () => { }); it('forwards success', () => { - const clientPool = new ClientPool<{}>(1, () => { + const clientPool = new ClientPool<{}>(1, 0, () => { return {}; }); @@ -182,7 +224,7 @@ describe('Client pool', () => { }); it('forwards failure', () => { - const clientPool = new ClientPool<{}>(1, () => { + const clientPool = new ClientPool<{}>(1, 0, () => { return {}; }); @@ -192,8 +234,57 @@ describe('Client pool', () => { return expect(op).to.eventually.be.rejectedWith('Generated error'); }); + it('keeps pool of idle clients', async () => { + const clientPool = new ClientPool<{}>( + /* concurrentOperationLimit= */ 1, + /* maxIdleClients= */ 3, + () => { + return {}; + } + ); + + const operationPromises = deferredPromises(4); + clientPool.run(REQUEST_TAG, () => operationPromises[0].promise); + clientPool.run(REQUEST_TAG, () => operationPromises[1].promise); + clientPool.run(REQUEST_TAG, () => operationPromises[2].promise); + const lastOp = clientPool.run( + REQUEST_TAG, + () => operationPromises[3].promise + ); + expect(clientPool.size).to.equal(4); + + // Resolve all pending operations. Note that one client is removed, while + // 3 are kept for further usage. + operationPromises.forEach(deferred => deferred.resolve()); + await lastOp; + expect(clientPool.size).to.equal(3); + }); + + it('default setting keeps at least one idle client', async () => { + const clientPool = new ClientPool<{}>( + 1, + /* maxIdleClients= git c*/ 1, + () => { + return {}; + } + ); + + const operationPromises = deferredPromises(2); + clientPool.run(REQUEST_TAG, () => operationPromises[0].promise); + const completionPromise = clientPool.run( + REQUEST_TAG, + () => operationPromises[1].promise + ); + expect(clientPool.size).to.equal(2); + + operationPromises[0].resolve(); + operationPromises[1].resolve(); + await completionPromise; + expect(clientPool.size).to.equal(1); + }); + it('rejects subsequent operations after being terminated', () => { - const clientPool = new ClientPool<{}>(1, () => { + const clientPool = new ClientPool<{}>(1, 0, () => { return {}; }); diff --git a/dev/test/typescript.ts b/dev/test/typescript.ts index 7cdd3452b..80993ddf2 100644 --- a/dev/test/typescript.ts +++ b/dev/test/typescript.ts @@ -58,6 +58,7 @@ xdescribe('firestore.d.ts', () => { firestore.settings({ keyFilename: 'foo', projectId: 'foo', + maxIdleChannels: 42, otherOption: 'foo', }); const collRef: CollectionReference = firestore.collection('coll'); diff --git a/dev/test/util/helpers.ts b/dev/test/util/helpers.ts index d47a8ff67..9877f6faa 100644 --- a/dev/test/util/helpers.ts +++ b/dev/test/util/helpers.ts @@ -105,20 +105,26 @@ export function createInstance( const firestore = new Firestore(); firestore.settings(initializationOptions); - const clientPool = new ClientPool(/* concurrentRequestLimit= */ 1, () => { - const gapicClient: GapicClient = new v1(initializationOptions); - if (apiOverrides) { - Object.keys(apiOverrides).forEach(override => { - const apiOverride = (apiOverrides as {[k: string]: unknown})[override]; - if (override !== 'getProjectId') { - gapicClient._innerApiCalls[override] = apiOverride; - } else { - gapicClient[override] = apiOverride; - } - }); + const clientPool = new ClientPool( + /* concurrentRequestLimit= */ 1, + /* maxIdleClients= */ 0, + () => { + const gapicClient: GapicClient = new v1(initializationOptions); + if (apiOverrides) { + Object.keys(apiOverrides).forEach(override => { + const apiOverride = (apiOverrides as {[k: string]: unknown})[ + override + ]; + if (override !== 'getProjectId') { + gapicClient._innerApiCalls[override] = apiOverride; + } else { + gapicClient[override] = apiOverride; + } + }); + } + return gapicClient; } - return gapicClient; - }); + ); firestore['_clientPool'] = clientPool; diff --git a/dev/test/watch.ts b/dev/test/watch.ts index 99b9d37e1..bc2c0792e 100644 --- a/dev/test/watch.ts +++ b/dev/test/watch.ts @@ -926,7 +926,7 @@ describe('Query watch', () => { } return result; - }); + }).timeout(5000); it('retries with unknown code', () => { return watchHelper.runTest(collQueryJSON(), () => { diff --git a/types/firestore.d.ts b/types/firestore.d.ts index 3a7122922..c6f5f1533 100644 --- a/types/firestore.d.ts +++ b/types/firestore.d.ts @@ -80,6 +80,14 @@ declare namespace FirebaseFirestore { /** Whether to use SSL when connecting. */ ssl?: boolean; + /** + * The maximum number of idle GRPC channels to keep. A smaller number of idle + * channels reduces memory usage but increases request latency for clients + * with fluctuating request rates. If set to 0, shuts down all GRPC channels + * when the client becomes idle. Defaults to 1. + */ + maxIdleChannels?: number; + [key: string]: any; // Accept other properties, such as GRPC settings. }