diff --git a/dev/src/index.ts b/dev/src/index.ts index 1790aaa13..3c66269fe 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -1011,12 +1011,13 @@ export class Firestore { * @returns The given Stream once it is considered healthy. */ private _initializeStream( - resultStream: NodeJS.ReadableStream, + releaser: () => void, resultStream: NodeJS.ReadableStream, requestTag: string): Promise; private _initializeStream( - resultStream: NodeJS.ReadWriteStream, requestTag: string, - request: {}): Promise; + releaser: () => void, resultStream: NodeJS.ReadWriteStream, + requestTag: string, request: {}): Promise; private _initializeStream( + releaser: () => void, resultStream: NodeJS.ReadableStream|NodeJS.ReadWriteStream, requestTag: string, request?: {}): Promise { @@ -1027,7 +1028,7 @@ export class Firestore { * Whether we have resolved the Promise and returned the stream to the * caller. */ - let streamReleased = false; + let streamInitialized = false; /** * Whether the stream end has been reached. This has to be forwarded to the @@ -1036,16 +1037,17 @@ export class Firestore { let endCalled = false; return new Promise((resolve, reject) => { - const releaseStream = () => { + const streamReady = () => { if (errorReceived) { logger( 'Firestore._initializeStream', requestTag, 'Emit error:', errorReceived); resultStream.emit('error', errorReceived); + releaser(); errorReceived = null; - } else if (!streamReleased) { + } else if (!streamInitialized) { logger('Firestore._initializeStream', requestTag, 'Releasing stream'); - streamReleased = true; + streamInitialized = true; resultStream.pause(); // Calling 'stream.pause()' only holds up 'data' events and not the @@ -1063,6 +1065,7 @@ export class Firestore { 'Firestore._initializeStream', requestTag, 'Forwarding stream close'); resultStream.emit('end'); + releaser(); } }, 0); } @@ -1073,14 +1076,14 @@ export class Firestore { // possible to avoid the default stream behavior (which is just to log and // continue). resultStream.on('readable', () => { - releaseStream(); + streamReady(); }); resultStream.on('end', () => { logger( 'Firestore._initializeStream', requestTag, 'Received stream end'); endCalled = true; - releaseStream(); + streamReady(); }); resultStream.on('error', err => { @@ -1089,12 +1092,13 @@ export class Firestore { 'Received stream error:', err); // If we receive an error before we were able to receive any data, // reject this stream. - if (!streamReleased) { + if (!streamInitialized) { logger( 'Firestore._initializeStream', requestTag, 'Received initial error:', err); - streamReleased = true; + streamInitialized = true; reject(err); + releaser(); } else { errorReceived = err; } @@ -1112,7 +1116,7 @@ export class Firestore { logger( 'Firestore._initializeStream', requestTag, 'Marking stream as healthy'); - releaseStream(); + streamReady(); }); } }); @@ -1182,32 +1186,33 @@ export class Firestore { const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; const callOptions = this.createCallOptions(); - return this._clientPool.run(gapicClient => { - return this._retry(attempts, requestTag, () => { - return new Promise((resolve, reject) => { - try { - logger( - 'Firestore.readStream', requestTag, - 'Sending request: %j', request); - const stream = gapicClient[methodName](request, callOptions); - const logStream = - through2.obj(function(this, chunk, enc, callback) { - logger( - 'Firestore.readStream', requestTag, - 'Received response: %j', chunk); - this.push(chunk); - callback(); - }); - resolve(bun([stream, logStream])); - } catch (err) { - logger( - 'Firestore.readStream', requestTag, - 'Received error:', err); - reject(err); - } - }) - .then(stream => this._initializeStream(stream, requestTag)); - }); + const gapicClient = this._clientPool.acquire(); + const releaser = this._clientPool.createReleaser(gapicClient); + + return this._retry(attempts, requestTag, () => { + return new Promise((resolve, reject) => { + try { + logger( + 'Firestore.readStream', requestTag, 'Sending request: %j', + request); + const stream = gapicClient[methodName](request, callOptions); + const logStream = + through2.obj(function(this, chunk, enc, callback) { + logger( + 'Firestore.readStream', requestTag, + 'Received response: %j', chunk); + this.push(chunk); + callback(); + }); + resolve(bun([stream, logStream])); + } catch (err) { + logger( + 'Firestore.readStream', requestTag, + 'Received error:', err); + reject(err); + } + }) + .then(stream => this._initializeStream(releaser, stream, requestTag)); }); } @@ -1233,23 +1238,25 @@ export class Firestore { const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; const callOptions = this.createCallOptions(); - return this._clientPool.run(gapicClient => { - return this._retry(attempts, requestTag, () => { - return Promise.resolve().then(() => { - logger('Firestore.readWriteStream', requestTag, 'Opening stream'); - const requestStream = gapicClient[methodName](callOptions); + const gapicClient = this._clientPool.acquire(); + const releaser = this._clientPool.createReleaser(gapicClient); - const logStream = through2.obj(function(this, chunk, enc, callback) { - logger( - 'Firestore.readWriteStream', requestTag, - 'Received response: %j', chunk); - this.push(chunk); - callback(); - }); + return this._retry(attempts, requestTag, () => { + return Promise.resolve().then(() => { + logger('Firestore.readWriteStream', requestTag, 'Opening stream'); + const requestStream = gapicClient[methodName](callOptions); - const resultStream = bun([requestStream, logStream]); - return this._initializeStream(resultStream, requestTag, request); + const logStream = through2.obj(function(this, chunk, enc, callback) { + logger( + 'Firestore.readWriteStream', requestTag, 'Received response: %j', + chunk); + this.push(chunk); + callback(); }); + + const resultStream = bun([requestStream, logStream]); + return this._initializeStream( + releaser, resultStream, requestTag, request); }); }); } diff --git a/dev/src/pool.ts b/dev/src/pool.ts index 4d789f2f4..e531069fe 100644 --- a/dev/src/pool.ts +++ b/dev/src/pool.ts @@ -49,7 +49,7 @@ export class ClientPool { * * @private */ - private acquire(): T { + acquire(): T { let selectedClient: T|null = null; let selectedRequestCount = 0; @@ -77,7 +77,7 @@ export class ClientPool { * removing it from the pool of active clients. * @private */ - private release(client: T): void { + release(client: T): void { let requestCount = this.activeClients.get(client) || 0; assert(requestCount > 0, 'No active request'); @@ -89,6 +89,26 @@ export class ClientPool { } } + /** + * Creates a new function that will release the given client, when called. + * + * This guarantees that the given client can only be released once. + * + * @private + */ + createReleaser(client: T): () => void { + // Unfortunately, once the release() call is disconnected from the Promise + // returned from _initializeStream, there's no single callback in which the + // releaser can be guaranteed to be called once. + let released = false; + return () => { + if (!released) { + released = true; + this.release(client); + } + }; + } + /** * The number of currently registered clients. *