Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow specifying how many idle GRPC channels to keep #837

Merged
merged 7 commits into from Dec 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions dev/src/index.ts
Expand Up @@ -130,6 +130,11 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix';
*/
const MAX_REQUEST_RETRIES = 5;

/*!
* The default number of idle GRPC channel to keep.
*/
const DEFAULT_MAX_IDLE_CHANNELS = 1;

/*!
* The maximum number of concurrent requests supported by a single GRPC channel,
* as enforced by Google's Frontend. If the SDK issues more than 100 concurrent
Expand Down Expand Up @@ -317,6 +322,10 @@ export class Firestore {
* can specify a `keyFilename` instead.
* @param {string=} settings.host The host to connect to.
* @param {boolean=} settings.ssl Whether to use SSL when connecting.
* @param {number=} settings.maxIdleChannels The maximum number of idle GRPC
* channels to keep. A smaller number of idle channels reduces memory usage
* but increases request latency for clients with fluctuating request rates.
* Defaults to 1.
*/
constructor(settings?: Settings) {
const libraryHeader = {
Expand Down Expand Up @@ -372,8 +381,13 @@ export class Firestore {
logger('Firestore', null, 'Detected GCF environment');
}

const maxIdleChannels =
this._settings.maxIdleChannels === undefined
? DEFAULT_MAX_IDLE_CHANNELS
: this._settings.maxIdleChannels;
this._clientPool = new ClientPool(
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
maxIdleChannels,
/* clientFactory= */ () => {
let client: GapicClient;

Expand Down Expand Up @@ -455,6 +469,12 @@ export class Firestore {
validateBoolean('settings.ssl', settings.ssl);
}

if (settings.maxIdleChannels !== undefined) {
validateInteger('settings.maxIdleChannels', settings.maxIdleChannels, {
minValue: 0,
});
}

this._settings = settings;
this._serializer = new Serializer(this);
}
Expand Down
84 changes: 44 additions & 40 deletions dev/src/pool.ts
Expand Up @@ -44,13 +44,16 @@ export class ClientPool<T> {
/**
* @param concurrentOperationLimit The number of operations that each client
* can handle.
* @param maxIdleClients The maximum number of idle clients to keep before
* garbage collecting.
* @param clientFactory A factory function called as needed when new clients
* are required.
* @param clientDestructor A cleanup function that is called when a client is
* disposed of.
*/
constructor(
private readonly concurrentOperationLimit: number,
private readonly maxIdleClients: number,
private readonly clientFactory: () => T,
private readonly clientDestructor: (client: T) => Promise<void> = () =>
Promise.resolve()
Expand All @@ -66,8 +69,13 @@ export class ClientPool<T> {
let selectedClient: T | null = null;
let selectedRequestCount = 0;

this.activeClients.forEach((requestCount, client) => {
if (!selectedClient && requestCount < this.concurrentOperationLimit) {
for (const [client, requestCount] of this.activeClients) {
// Bin pack requests to reduce the maximize the number of idle clients as
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"reduce the maximize the ..."? Also, I don't do bin packing enough for this comment to 100% make sense right away. And I had to think more than 3 seconds to figure out what "selectedRequestCount" was.

Perhaps rename selectedRequestCount to selectedClientRequestCount (long/verbose, I know, sorry!) and the comment to something like:

Use the "most-full" client that can still accommodate the required requestCount in order to maximize the number of idle clients as operations start to complete.

Don't feel very strongly though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your comment is not only grammatically correct, but also more helpful. Thanks!

// operations start to complete
if (
requestCount > selectedRequestCount &&
requestCount < this.concurrentOperationLimit
) {
logger(
'ClientPool.acquire',
requestTag,
Expand All @@ -77,7 +85,7 @@ export class ClientPool<T> {
selectedClient = client;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log line above is going to get spewed multiple times now. I think it should be extracted from the for-loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I moved it outside the loop.

selectedRequestCount = requestCount;
}
});
}

if (!selectedClient) {
logger('ClientPool.acquire', requestTag, 'Creating a new client');
Expand All @@ -99,23 +107,43 @@ export class ClientPool<T> {
* @private
*/
private async release(requestTag: string, client: T): Promise<void> {
let requestCount = this.activeClients.get(client) || 0;
const requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');
this.activeClients.set(client, requestCount! - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems odd you need the ! but I assume you have your reasons...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm. I hope I had my reasons when I added it first, but they are not immediately apparent now. Removed.


requestCount = requestCount! - 1;
this.activeClients.set(client, requestCount);
if (this.shouldGarbageCollectClient(client)) {
this.activeClients.delete(client);
await this.clientDestructor(client);
logger('ClientPool.release', requestTag, 'Garbage collected 1 client');
}
}

if (requestCount === 0) {
const deletedCount = await this.garbageCollect();
if (deletedCount) {
logger(
'ClientPool.release',
requestTag,
'Garbage collected %s clients',
deletedCount
);
}
/**
* Given the current operation counts, determines if the given client should
* be garbage collected.
* @private
*/
private shouldGarbageCollectClient(client: T): boolean {
if (this.activeClients.get(client) !== 0) {
return false;
}

// Compute the remaining capacity of the ClientPool. If the capacity exceeds
// the total capacity that `maxIdleClients` could hold, garbage collect. We
// look at the capacity rather than just at the current request count to
// allows us to:
// - Use `maxIdleClients:1` to preserve legacy behavior (there is always at
// least one active client as a single client can never exceed the
// concurrent operation limit by itself).
// - Use `maxIdleClients:0` to shut down the client pool completely when all
// clients are idle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I read this comment 2 or 3 times, and read the code once, and the code still makes more sense to me than the comment. So it might be worth rephrasing / paring down the comment. I'm not 100% sure what it's trying to get across. Is it justifying the existence of the maxIdleClients setting (in which maybe the bulk of this comment should go there?)? Or is there actually some nuance to this code that needs 10 lines of explanation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to justify that the code doesn't just count the number of idle clients, but instead counts the number of idle "request slots". If this comment doesn't help, then I am ok with removing it.

let idleCapacityCount = 0;
for (const [_, count] of this.activeClients) {
idleCapacityCount += this.concurrentOperationLimit - count;
}
return (
idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit
);
}

/**
Expand Down Expand Up @@ -177,28 +205,4 @@ export class ClientPool<T> {
await this.clientDestructor(client);
}
}

/**
* Deletes clients that are no longer executing operations. Keeps up to one
* idle client to reduce future initialization costs.
*
* @return Number of clients deleted.
* @private
*/
private async garbageCollect(): Promise<number> {
let idleClients = 0;
const cleanUpTasks: Array<Promise<void>> = [];
for (const [client, requestCount] of this.activeClients) {
if (requestCount === 0) {
++idleClients;

if (idleClients > 1) {
this.activeClients.delete(client);
cleanUpTasks.push(this.clientDestructor(client));
}
}
}
await Promise.all(cleanUpTasks);
return idleClients - 1;
}
}
7 changes: 7 additions & 0 deletions dev/src/types.ts
Expand Up @@ -72,6 +72,13 @@ export interface Settings {
/** Whether to use SSL when connecting. */
ssl?: boolean;

/**
* The maximum number of idle GRPC channels to keep. A smaller number of idle
* channels reduces memory usage but increases request latency for clients
* with fluctuating request rates. Defaults to 1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere you document that setting it to 0 allows the client pool to shut down completely. Is that a behavior that should be documented here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion. Added (here and in the other two places...)

*/
maxIdleChannels?: number;

// tslint:disable-next-line:no-any
[key: string]: any; // Accept other properties, such as GRPC settings.
}
Expand Down
13 changes: 13 additions & 0 deletions dev/test/index.ts
Expand Up @@ -491,6 +491,19 @@ describe('instantiation', () => {
}
});

it('validates maxIdleChannels', () => {
const invalidValues = [-1, 'foo', 1.3];

for (const value of invalidValues) {
expect(() => {
const settings = {...DEFAULT_SETTINGS, maxIdleChannels: value};
new Firestore.Firestore(settings as InvalidApiUsage);
}).to.throw();
}

new Firestore.Firestore({maxIdleChannels: 1});
});

it('uses project id from constructor', () => {
const firestore = new Firestore.Firestore({projectId: 'foo'});

Expand Down
111 changes: 99 additions & 12 deletions dev/test/pool.ts
Expand Up @@ -32,7 +32,7 @@ function deferredPromises(count: number): Array<Deferred<void>> {

describe('Client pool', () => {
it('creates new instances as needed', () => {
const clientPool = new ClientPool<{}>(3, () => {
const clientPool = new ClientPool<{}>(3, 0, () => {
return {};
});

Expand All @@ -52,7 +52,7 @@ describe('Client pool', () => {
});

it('re-uses idle instances', () => {
const clientPool = new ClientPool<{}>(2, () => {
const clientPool = new ClientPool<{}>(2, 0, () => {
return {};
});

Expand Down Expand Up @@ -80,8 +80,51 @@ describe('Client pool', () => {
});
});

it('bin packs operations', async () => {
let clientCount = 0;
const clientPool = new ClientPool<number>(2, 0, () => {
return ++clientCount;
});

expect(clientPool.size).to.equal(0);

// Create 5 operations, which should schedule 2 operations on the first
// client, 2 on the second and 1 on the third.
const operationPromises = deferredPromises(7);
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(1);
return operationPromises[0].promise;
});
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(1);
return operationPromises[1].promise;
});
const thirdOperation = clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(2);
return operationPromises[2].promise;
});
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(2);
return operationPromises[3].promise;
});
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(3);
return operationPromises[4].promise;
});

// Free one slow on the second client.
operationPromises[2].resolve();
await thirdOperation;

// A newly scheduled operation should use the first client that has a free
// slot.
clientPool.run(REQUEST_TAG, async client => {
expect(client).to.be.equal(2);
});
});

it('garbage collects after success', () => {
const clientPool = new ClientPool<{}>(2, () => {
const clientPool = new ClientPool<{}>(2, 0, () => {
return {};
});

Expand Down Expand Up @@ -110,12 +153,12 @@ describe('Client pool', () => {
operationPromises.forEach(deferred => deferred.resolve());

return Promise.all(completionPromises).then(() => {
expect(clientPool.size).to.equal(1);
expect(clientPool.size).to.equal(0);
});
});

it('garbage collects after error', () => {
const clientPool = new ClientPool<{}>(2, () => {
const clientPool = new ClientPool<{}>(2, 0, () => {
return {};
});

Expand Down Expand Up @@ -145,7 +188,7 @@ describe('Client pool', () => {

return Promise.all(completionPromises.map(p => p.catch(() => {}))).then(
() => {
expect(clientPool.size).to.equal(1);
expect(clientPool.size).to.equal(0);
}
);
});
Expand All @@ -155,9 +198,8 @@ describe('Client pool', () => {

const clientPool = new ClientPool<{}>(
1,
() => {
return {};
},
0,
() => ({}),
() => Promise.resolve(garbageCollect.resolve())
);

Expand All @@ -173,7 +215,7 @@ describe('Client pool', () => {
});

it('forwards success', () => {
const clientPool = new ClientPool<{}>(1, () => {
const clientPool = new ClientPool<{}>(1, 0, () => {
return {};
});

Expand All @@ -182,7 +224,7 @@ describe('Client pool', () => {
});

it('forwards failure', () => {
const clientPool = new ClientPool<{}>(1, () => {
const clientPool = new ClientPool<{}>(1, 0, () => {
return {};
});

Expand All @@ -192,8 +234,53 @@ describe('Client pool', () => {
return expect(op).to.eventually.be.rejectedWith('Generated error');
});

it('keeps pool of idle clients', async () => {
const clientPool = new ClientPool<{}>(
/* concurrentOperationLimit= */ 1,
/* maxIdleClients= */ 3,
() => {
return {};
}
);

const operationPromises = deferredPromises(4);
clientPool.run(REQUEST_TAG, () => operationPromises[0].promise);
clientPool.run(REQUEST_TAG, () => operationPromises[1].promise);
clientPool.run(REQUEST_TAG, () => operationPromises[2].promise);
const lastOp = clientPool.run(
REQUEST_TAG,
() => operationPromises[3].promise
);
expect(clientPool.size).to.equal(4);

// Resolve all pending operations. Note that one client is removed, while
// 3 are kept for further usage.
operationPromises.forEach(deferred => deferred.resolve());
await lastOp;
expect(clientPool.size).to.equal(3);
});

it('default setting keeps at least one idle client', async () => {
const clientPool = new ClientPool<{}>(1, /* maxIdleClients=*/ 1, () => {
return {};
});

const operationPromises = deferredPromises(2);
clientPool.run(REQUEST_TAG, () => operationPromises[0].promise);
const completionPromise = clientPool.run(
REQUEST_TAG,
() => operationPromises[1].promise
);
expect(clientPool.size).to.equal(2);

operationPromises[0].resolve();
operationPromises[1].resolve();
await completionPromise;
expect(clientPool.size).to.equal(1);
});

it('rejects subsequent operations after being terminated', () => {
const clientPool = new ClientPool<{}>(1, () => {
const clientPool = new ClientPool<{}>(1, 0, () => {
return {};
});

Expand Down