Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing 101 listeners #256

Merged
merged 9 commits into from Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
131 changes: 89 additions & 42 deletions src/index.js
Expand Up @@ -16,6 +16,7 @@

'use strict';

const assert = require('assert');
const bun = require('bun');
const extend = require('extend');
const is = require('is');
Expand Down Expand Up @@ -64,6 +65,11 @@ const FieldValue = require('./field-value').FieldValue;
*/
const Timestamp = require('./timestamp');

/*!
* @see ClientPool
*/
const ClientPool = require('./pool').ClientPool;

/*!
* @see CollectionReference
*/
Expand Down Expand Up @@ -100,12 +106,12 @@ let Transaction;
/*!
* @see v1beta1
*/
let v1beta1; // Lazy-loaded in `_ensureClient()`
let v1beta1; // Lazy-loaded in `_runRequest()`

/*!
* @see @google-cloud/common
*/
let common; // Lazy-loaded in `_ensureClient()`
let common; // Lazy-loaded in `_runRequest()`

/*!
* HTTP header for the resource prefix to improve routing and project isolation
Expand All @@ -120,6 +126,16 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix';
*/
const MAX_REQUEST_RETRIES = 5;

/*!
* The maximum number of concurrent requests supported by a single GRPC channel,
* as enforced by Google's Frontend. If the SDK issues more than 100 concurrent
* operations, we need to use more than one GAPIC client since these clients
* multiplex all requests over a single channel.
*
* @type {number}
*/
const MAX_CONCURRENT_REQUESTS_PER_CLIENT = 100;

/*!
* GRPC Error code for 'UNAVAILABLE'.
* @type {number}
Expand Down Expand Up @@ -233,11 +249,12 @@ class Firestore {
});

/**
* A client pool to distribute requests over multiple GAPIC clients in order
* to work around a connection limit of 100 concurrent requests per client.
* @private
* @type {object|null}
* @property {FirestoreClient} Firestore The Firestore GAPIC client.
* @type {ClientPool|null}
*/
this._firestoreClient = null;
this._clientPool = null;

/**
* The configuration options for the GAPIC client.
Expand Down Expand Up @@ -725,47 +742,78 @@ follow these steps, YOUR APP MAY BREAK.`);
}

/**
* Initializes the client and detects the Firestore Project ID. Returns a
* Promise on completion. If the client is already initialized, the returned
* Promise resolves immediately.
* Executes a new request using the first available GAPIC client.
*
* @private
*/
_ensureClient() {
_runRequest(op) {
// Initialize the client pool if this is the first request.
if (!this._clientInitialized) {
common = require('@google-cloud/common');
this._clientInitialized = this._initClientPool().then(clientPool => {
this._clientPool = clientPool;
})
}

this._clientInitialized = new Promise((resolve, reject) => {
this._firestoreClient =
module.exports.v1beta1(this._initalizationOptions);
return this._clientInitialized.then(() => this._clientPool.run(op));
}

Firestore.log('Firestore', null, 'Initialized Firestore GAPIC Client');
/**
* Initializes the client pool and invokes Project ID detection. Returns a
* Promise on completion.
*
* @private
* @return {Promise<ClientPool>}
*/
_initClientPool() {
assert(!this._clientInitialized, 'Client pool already initialized');

// We schedule Project ID detection using `setImmediate` to allow the
// testing framework to provide its own implementation of
// `getProjectId`.
setImmediate(() => {
this._firestoreClient.getProjectId((err, projectId) => {
if (err) {
Firestore.log(
'Firestore._ensureClient', null,
'Failed to detect project ID: %s', err);
reject(err);
} else {
Firestore.log(
'Firestore._ensureClient', null, 'Detected project ID: %s',
projectId);
this._referencePath =
new ResourcePath(projectId, this._referencePath.databaseId);
resolve();
}
});
const clientPool =
new ClientPool(MAX_CONCURRENT_REQUESTS_PER_CLIENT, () => {
const gapicClient =
module.exports.v1beta1(this._initalizationOptions);
Firestore.log(
'Firestore', null, 'Initialized Firestore GAPIC Client');
return gapicClient;
});
});

const projectIdProvided = this._referencePath.projectId !== '{{projectId}}';

if (projectIdProvided) {
return Promise.resolve(clientPool);
} else {
return clientPool.run(client => this._detectProjectId(client))
.then(projectId => {
this._referencePath =
new ResourcePath(projectId, this._referencePath.databaseId);
return clientPool;
});
}
return this._clientInitialized;
}

/**
* Auto-detects the Firestore Project ID.
*
* @private
* @param {object} gapicClient - The Firestore GAPIC client.
* @return {Promise<string>} A Promise that resolves with the Project ID.
*/
_detectProjectId(gapicClient) {
return new Promise(
(resolve, reject) => {gapicClient.getProjectId((err, projectId) => {
if (err) {
Firestore.log(
'Firestore._detectProjectId', null,
'Failed to detect project ID: %s', err);
reject(err);
} else {
Firestore.log(
'Firestore._detectProjectId', null, 'Detected project ID: %s',
projectId);
resolve(projectId);
}
})});
}
/**
* Decorate the request options of an API request. This is used to replace
* any `{{projectId}}` placeholders with the value detected from the user's
Expand Down Expand Up @@ -801,7 +849,7 @@ follow these steps, YOUR APP MAY BREAK.`);
*
* @private
* @param {number} attemptsRemaining - The number of available attempts.
* @param {string} requestTag A unique client-assigned identifier for this
* @param {string} requestTag - A unique client-assigned identifier for this
* request.
* @param {retryFunction} func - Method returning a Promise than can be
* retried.
Expand Down Expand Up @@ -972,14 +1020,14 @@ follow these steps, YOUR APP MAY BREAK.`);
request(methodName, request, requestTag, allowRetries) {
let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;

return this._ensureClient().then(() => {
return this._runRequest(gapicClient => {
const decorated = this._decorateRequest(request);
return this._retry(attempts, requestTag, () => {
return new Promise((resolve, reject) => {
Firestore.log(
'Firestore.request', requestTag, 'Sending request: %j',
decorated.request);
this._firestoreClient[methodName](
gapicClient[methodName](
decorated.request, decorated.gax, (err, result) => {
if (err) {
Firestore.log(
Expand Down Expand Up @@ -1017,15 +1065,15 @@ follow these steps, YOUR APP MAY BREAK.`);
readStream(methodName, request, requestTag, allowRetries) {
let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;

return this._ensureClient().then(() => {
return this._runRequest(gapicClient => {
const decorated = this._decorateRequest(request);
return this._retry(attempts, requestTag, () => {
return new Promise((resolve, reject) => {
try {
Firestore.log(
'Firestore.readStream', requestTag,
'Sending request: %j', decorated.request);
let stream = this._firestoreClient[methodName](
let stream = gapicClient[methodName](
decorated.request, decorated.gax);
let logger = through.obj(function(chunk, enc, callback) {
Firestore.log(
Expand Down Expand Up @@ -1069,16 +1117,15 @@ follow these steps, YOUR APP MAY BREAK.`);
let self = this;
let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;

return this._ensureClient().then(() => {
return this._runRequest(gapicClient => {
const decorated = this._decorateRequest(request);
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
Firestore.log(
'Firestore.readWriteStream', requestTag, 'Opening stream');
// The generated bi-directional streaming API takes the list of GAX
// headers as its second argument.
let requestStream =
this._firestoreClient[methodName]({}, decorated.gax);
let requestStream = gapicClient[methodName]({}, decorated.gax);

// The transform stream to assign the project ID.
let transform = through.obj(function(chunk, encoding, callback) {
Expand Down
135 changes: 135 additions & 0 deletions src/pool.ts
@@ -0,0 +1,135 @@
/*!
* Copyright 2018 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

import assert from 'assert';

/**
* An auto-resizing pool that distributes concurrent operations over multiple
* clients of type `T`.
*
* ClientPool is used within Firestore to manage a pool of GAPIC clients and
* automatically initializes multiple clients if we issue more than 100
* concurrent operations.
*/
export class ClientPool<T> {
/** Stores each active clients and how many operations it has outstanding. */
private activeClients: Map<T, number> = new Map();

/**
* @param {number} concurrentOperationLimit - The number of operations that
* each client can handle.
* @param {() => T} clientFactory - A factory function called as needed when
* new clients are required.
*/
constructor(
private readonly concurrentOperationLimit: number,
private readonly clientFactory: () => T) {}

/**
* Returns an already existing client if it has less than the maximum number
* of concurrent operations or initializes and returns a new client.
*/
private acquire(): T {
let selectedClient: T|null = null;
let selectedRequestCount = 0;

this.activeClients.forEach((requestCount, client) => {
if (!selectedClient && requestCount < this.concurrentOperationLimit) {
selectedClient = client;
selectedRequestCount = requestCount;
}
});

if (!selectedClient) {
selectedClient = this.clientFactory();
assert(
!this.activeClients.has(selectedClient),
'The provided client factory returned an existing instance');
}

this.activeClients.set(selectedClient, selectedRequestCount + 1);

return selectedClient!;
}

/**
* Reduces the number of operations for the provided client, potentially
* removing it from the pool of active clients.
*/
private release(client: T): void {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

requestCount = requestCount! - 1;
this.activeClients.set(client, requestCount);

if (requestCount === 0) {
this.garbageCollect();
}
}

/**
* The number of currently registered clients.
*
* @return {number} Number of currently registered clients.
*/
// Visible for testing.
get size(): number {
return this.activeClients.size;
}

/**
* Runs the provided operation in this pool. This function may create an
* additional client if all existing clients already operate at the concurrent
* operation limit.
*
* @param {(client: T) => Promise<V>} op - A callback function that returns a
* Promise. The client T will be returned to the pool when callback finishes.
* @return {Promise<V>} A Promise that resolves with the result of `op`.
*/
run<V>(op: (client: T) => Promise<V>): Promise<V> {
const client = this.acquire();

return op(client)

This comment was marked as spam.

This comment was marked as spam.

.catch(err => {
this.release(client);
return Promise.reject(err);
})
.then(res => {
this.release(client);
return res;
});
}

/**
* Deletes clients that are no longer executing operations. Keeps up to one
* idle client to reduce future initialization costs.

This comment was marked as spam.

This comment was marked as spam.

*/
private garbageCollect(): void {
let idleClients = 0;
this.activeClients.forEach((requestCount, client) => {
if (requestCount === 0) {
++idleClients;

if (idleClients > 1) {
this.activeClients.delete(client);
}
}
});
}
}