From 479bc9c4847cc2a5632a266da706b349a1b74a41 Mon Sep 17 00:00:00 2001 From: Gil Date: Fri, 10 May 2019 10:10:08 -0700 Subject: [PATCH] fix: Fix client pooling for long-lived listens (#614) Fixes firebase/firebase-admin-node#499 Previously _initializeStream returned a Promise that indicated that the stream was "released", i.e. that it was was ready for attaching listeners. #256 Added pooled clients and changed the callers of _initializeStream to reuse this promise such that when it was resolved, the stream could be returned to the pool. This works when listeners are short-lived, but fails when listeners run indefinitely. This change arranges to release the clients back to the pool only after the stream has completed, which allows an arbitrary number of indefinite listens to run without problems. This turns out to be fiendishly difficult to test given the current structure of the code. A second pass at this that reformulates this as just another stream that composes with the others would make this easier to understand and test. For now, this fix unblocks the customers waiting on the referenced issue. --- dev/src/index.ts | 111 +++++++++++++++++++++++++---------------------- dev/src/pool.ts | 24 +++++++++- 2 files changed, 81 insertions(+), 54 deletions(-) diff --git a/dev/src/index.ts b/dev/src/index.ts index 1790aaa13..3c66269fe 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -1011,12 +1011,13 @@ export class Firestore { * @returns The given Stream once it is considered healthy. */ private _initializeStream( - resultStream: NodeJS.ReadableStream, + releaser: () => void, resultStream: NodeJS.ReadableStream, requestTag: string): Promise; private _initializeStream( - resultStream: NodeJS.ReadWriteStream, requestTag: string, - request: {}): Promise; + releaser: () => void, resultStream: NodeJS.ReadWriteStream, + requestTag: string, request: {}): Promise; private _initializeStream( + releaser: () => void, resultStream: NodeJS.ReadableStream|NodeJS.ReadWriteStream, requestTag: string, request?: {}): Promise { @@ -1027,7 +1028,7 @@ export class Firestore { * Whether we have resolved the Promise and returned the stream to the * caller. */ - let streamReleased = false; + let streamInitialized = false; /** * Whether the stream end has been reached. This has to be forwarded to the @@ -1036,16 +1037,17 @@ export class Firestore { let endCalled = false; return new Promise((resolve, reject) => { - const releaseStream = () => { + const streamReady = () => { if (errorReceived) { logger( 'Firestore._initializeStream', requestTag, 'Emit error:', errorReceived); resultStream.emit('error', errorReceived); + releaser(); errorReceived = null; - } else if (!streamReleased) { + } else if (!streamInitialized) { logger('Firestore._initializeStream', requestTag, 'Releasing stream'); - streamReleased = true; + streamInitialized = true; resultStream.pause(); // Calling 'stream.pause()' only holds up 'data' events and not the @@ -1063,6 +1065,7 @@ export class Firestore { 'Firestore._initializeStream', requestTag, 'Forwarding stream close'); resultStream.emit('end'); + releaser(); } }, 0); } @@ -1073,14 +1076,14 @@ export class Firestore { // possible to avoid the default stream behavior (which is just to log and // continue). resultStream.on('readable', () => { - releaseStream(); + streamReady(); }); resultStream.on('end', () => { logger( 'Firestore._initializeStream', requestTag, 'Received stream end'); endCalled = true; - releaseStream(); + streamReady(); }); resultStream.on('error', err => { @@ -1089,12 +1092,13 @@ export class Firestore { 'Received stream error:', err); // If we receive an error before we were able to receive any data, // reject this stream. - if (!streamReleased) { + if (!streamInitialized) { logger( 'Firestore._initializeStream', requestTag, 'Received initial error:', err); - streamReleased = true; + streamInitialized = true; reject(err); + releaser(); } else { errorReceived = err; } @@ -1112,7 +1116,7 @@ export class Firestore { logger( 'Firestore._initializeStream', requestTag, 'Marking stream as healthy'); - releaseStream(); + streamReady(); }); } }); @@ -1182,32 +1186,33 @@ export class Firestore { const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; const callOptions = this.createCallOptions(); - return this._clientPool.run(gapicClient => { - return this._retry(attempts, requestTag, () => { - return new Promise((resolve, reject) => { - try { - logger( - 'Firestore.readStream', requestTag, - 'Sending request: %j', request); - const stream = gapicClient[methodName](request, callOptions); - const logStream = - through2.obj(function(this, chunk, enc, callback) { - logger( - 'Firestore.readStream', requestTag, - 'Received response: %j', chunk); - this.push(chunk); - callback(); - }); - resolve(bun([stream, logStream])); - } catch (err) { - logger( - 'Firestore.readStream', requestTag, - 'Received error:', err); - reject(err); - } - }) - .then(stream => this._initializeStream(stream, requestTag)); - }); + const gapicClient = this._clientPool.acquire(); + const releaser = this._clientPool.createReleaser(gapicClient); + + return this._retry(attempts, requestTag, () => { + return new Promise((resolve, reject) => { + try { + logger( + 'Firestore.readStream', requestTag, 'Sending request: %j', + request); + const stream = gapicClient[methodName](request, callOptions); + const logStream = + through2.obj(function(this, chunk, enc, callback) { + logger( + 'Firestore.readStream', requestTag, + 'Received response: %j', chunk); + this.push(chunk); + callback(); + }); + resolve(bun([stream, logStream])); + } catch (err) { + logger( + 'Firestore.readStream', requestTag, + 'Received error:', err); + reject(err); + } + }) + .then(stream => this._initializeStream(releaser, stream, requestTag)); }); } @@ -1233,23 +1238,25 @@ export class Firestore { const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; const callOptions = this.createCallOptions(); - return this._clientPool.run(gapicClient => { - return this._retry(attempts, requestTag, () => { - return Promise.resolve().then(() => { - logger('Firestore.readWriteStream', requestTag, 'Opening stream'); - const requestStream = gapicClient[methodName](callOptions); + const gapicClient = this._clientPool.acquire(); + const releaser = this._clientPool.createReleaser(gapicClient); - const logStream = through2.obj(function(this, chunk, enc, callback) { - logger( - 'Firestore.readWriteStream', requestTag, - 'Received response: %j', chunk); - this.push(chunk); - callback(); - }); + return this._retry(attempts, requestTag, () => { + return Promise.resolve().then(() => { + logger('Firestore.readWriteStream', requestTag, 'Opening stream'); + const requestStream = gapicClient[methodName](callOptions); - const resultStream = bun([requestStream, logStream]); - return this._initializeStream(resultStream, requestTag, request); + const logStream = through2.obj(function(this, chunk, enc, callback) { + logger( + 'Firestore.readWriteStream', requestTag, 'Received response: %j', + chunk); + this.push(chunk); + callback(); }); + + const resultStream = bun([requestStream, logStream]); + return this._initializeStream( + releaser, resultStream, requestTag, request); }); }); } diff --git a/dev/src/pool.ts b/dev/src/pool.ts index 4d789f2f4..e531069fe 100644 --- a/dev/src/pool.ts +++ b/dev/src/pool.ts @@ -49,7 +49,7 @@ export class ClientPool { * * @private */ - private acquire(): T { + acquire(): T { let selectedClient: T|null = null; let selectedRequestCount = 0; @@ -77,7 +77,7 @@ export class ClientPool { * removing it from the pool of active clients. * @private */ - private release(client: T): void { + release(client: T): void { let requestCount = this.activeClients.get(client) || 0; assert(requestCount > 0, 'No active request'); @@ -89,6 +89,26 @@ export class ClientPool { } } + /** + * Creates a new function that will release the given client, when called. + * + * This guarantees that the given client can only be released once. + * + * @private + */ + createReleaser(client: T): () => void { + // Unfortunately, once the release() call is disconnected from the Promise + // returned from _initializeStream, there's no single callback in which the + // releaser can be guaranteed to be called once. + let released = false; + return () => { + if (!released) { + released = true; + this.release(client); + } + }; + } + /** * The number of currently registered clients. *