Skip to content

Commit

Permalink
feat: allow specifying how many idle GRPC channels to keep (#837)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Dec 27, 2019
1 parent 5c870e6 commit 37e93da
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 70 deletions.
21 changes: 21 additions & 0 deletions dev/src/index.ts
Expand Up @@ -130,6 +130,11 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix';
*/
const MAX_REQUEST_RETRIES = 5;

/*!
* The default number of idle GRPC channel to keep.
*/
const DEFAULT_MAX_IDLE_CHANNELS = 1;

/*!
* The maximum number of concurrent requests supported by a single GRPC channel,
* as enforced by Google's Frontend. If the SDK issues more than 100 concurrent
Expand Down Expand Up @@ -317,6 +322,11 @@ export class Firestore {
* can specify a `keyFilename` instead.
* @param {string=} settings.host The host to connect to.
* @param {boolean=} settings.ssl Whether to use SSL when connecting.
* @param {number=} settings.maxIdleChannels The maximum number of idle GRPC
* channels to keep. A smaller number of idle channels reduces memory usage
* but increases request latency for clients with fluctuating request rates.
* If set to 0, shuts down all GRPC channels when the client becomes idle.
* Defaults to 1.
*/
constructor(settings?: Settings) {
const libraryHeader = {
Expand Down Expand Up @@ -372,8 +382,13 @@ export class Firestore {
logger('Firestore', null, 'Detected GCF environment');
}

const maxIdleChannels =
this._settings.maxIdleChannels === undefined
? DEFAULT_MAX_IDLE_CHANNELS
: this._settings.maxIdleChannels;
this._clientPool = new ClientPool(
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
maxIdleChannels,
/* clientFactory= */ () => {
let client: GapicClient;

Expand Down Expand Up @@ -455,6 +470,12 @@ export class Firestore {
validateBoolean('settings.ssl', settings.ssl);
}

if (settings.maxIdleChannels !== undefined) {
validateInteger('settings.maxIdleChannels', settings.maxIdleChannels, {
minValue: 0,
});
}

this._settings = settings;
this._serializer = new Serializer(this);
}
Expand Down
91 changes: 47 additions & 44 deletions dev/src/pool.ts
Expand Up @@ -44,13 +44,16 @@ export class ClientPool<T> {
/**
* @param concurrentOperationLimit The number of operations that each client
* can handle.
* @param maxIdleClients The maximum number of idle clients to keep before
* garbage collecting.
* @param clientFactory A factory function called as needed when new clients
* are required.
* @param clientDestructor A cleanup function that is called when a client is
* disposed of.
*/
constructor(
private readonly concurrentOperationLimit: number,
private readonly maxIdleClients: number,
private readonly clientFactory: () => T,
private readonly clientDestructor: (client: T) => Promise<void> = () =>
Promise.resolve()
Expand All @@ -64,22 +67,35 @@ export class ClientPool<T> {
*/
private acquire(requestTag: string): T {
let selectedClient: T | null = null;
let selectedRequestCount = 0;
let selectedClientRequestCount = 0;

this.activeClients.forEach((requestCount, client) => {
if (!selectedClient && requestCount < this.concurrentOperationLimit) {
for (const [client, requestCount] of this.activeClients) {
// Use the "most-full" client that can still accommodate the request
// in order to maximize the number of idle clients as operations start to
// complete.
if (
requestCount > selectedClientRequestCount &&
requestCount < this.concurrentOperationLimit
) {
logger(
'ClientPool.acquire',
requestTag,
'Re-using existing client with %s remaining operations',
this.concurrentOperationLimit - requestCount
);
selectedClient = client;
selectedRequestCount = requestCount;
selectedClientRequestCount = requestCount;
}
});
}

if (!selectedClient) {
if (selectedClient) {
logger(
'ClientPool.acquire',
requestTag,
'Re-using existing client with %s remaining operations',
this.concurrentOperationLimit - selectedClientRequestCount
);
} else {
logger('ClientPool.acquire', requestTag, 'Creating a new client');
selectedClient = this.clientFactory();
assert(
Expand All @@ -88,7 +104,7 @@ export class ClientPool<T> {
);
}

this.activeClients.set(selectedClient, selectedRequestCount + 1);
this.activeClients.set(selectedClient, selectedClientRequestCount + 1);

return selectedClient!;
}
Expand All @@ -99,23 +115,34 @@ export class ClientPool<T> {
* @private
*/
private async release(requestTag: string, client: T): Promise<void> {
let requestCount = this.activeClients.get(client) || 0;
const requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');
this.activeClients.set(client, requestCount - 1);

requestCount = requestCount! - 1;
this.activeClients.set(client, requestCount);
if (this.shouldGarbageCollectClient(client)) {
this.activeClients.delete(client);
await this.clientDestructor(client);
logger('ClientPool.release', requestTag, 'Garbage collected 1 client');
}
}

if (requestCount === 0) {
const deletedCount = await this.garbageCollect();
if (deletedCount) {
logger(
'ClientPool.release',
requestTag,
'Garbage collected %s clients',
deletedCount
);
}
/**
* Given the current operation counts, determines if the given client should
* be garbage collected.
* @private
*/
private shouldGarbageCollectClient(client: T): boolean {
if (this.activeClients.get(client) !== 0) {
return false;
}

let idleCapacityCount = 0;
for (const [_, count] of this.activeClients) {
idleCapacityCount += this.concurrentOperationLimit - count;
}
return (
idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit
);
}

/**
Expand Down Expand Up @@ -177,28 +204,4 @@ export class ClientPool<T> {
await this.clientDestructor(client);
}
}

/**
* Deletes clients that are no longer executing operations. Keeps up to one
* idle client to reduce future initialization costs.
*
* @return Number of clients deleted.
* @private
*/
private async garbageCollect(): Promise<number> {
let idleClients = 0;
const cleanUpTasks: Array<Promise<void>> = [];
for (const [client, requestCount] of this.activeClients) {
if (requestCount === 0) {
++idleClients;

if (idleClients > 1) {
this.activeClients.delete(client);
cleanUpTasks.push(this.clientDestructor(client));
}
}
}
await Promise.all(cleanUpTasks);
return idleClients - 1;
}
}
8 changes: 8 additions & 0 deletions dev/src/types.ts
Expand Up @@ -72,6 +72,14 @@ export interface Settings {
/** Whether to use SSL when connecting. */
ssl?: boolean;

/**
* The maximum number of idle GRPC channels to keep. A smaller number of idle
* channels reduces memory usage but increases request latency for clients
* with fluctuating request rates. If set to 0, shuts down all GRPC channels
* when the client becomes idle. Defaults to 1.
*/
maxIdleChannels?: number;

// tslint:disable-next-line:no-any
[key: string]: any; // Accept other properties, such as GRPC settings.
}
Expand Down
13 changes: 13 additions & 0 deletions dev/test/index.ts
Expand Up @@ -491,6 +491,19 @@ describe('instantiation', () => {
}
});

it('validates maxIdleChannels', () => {
const invalidValues = [-1, 'foo', 1.3];

for (const value of invalidValues) {
expect(() => {
const settings = {...DEFAULT_SETTINGS, maxIdleChannels: value};
new Firestore.Firestore(settings as InvalidApiUsage);
}).to.throw();
}

new Firestore.Firestore({maxIdleChannels: 1});
});

it('uses project id from constructor', () => {
const firestore = new Firestore.Firestore({projectId: 'foo'});

Expand Down

0 comments on commit 37e93da

Please sign in to comment.