diff --git a/dev/src/index.ts b/dev/src/index.ts index 2d5f92fe8..131014cd9 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -1173,6 +1173,8 @@ export class Firestore { * * @private * @param backendStream The Node stream to monitor. + * @param lifetime A Promise that resolves when the stream receives an 'end', + * 'close' or 'finish' message. * @param requestTag A unique client-assigned identifier for this request. * @param request If specified, the request that should be written to the * stream after opening. @@ -1181,6 +1183,7 @@ export class Firestore { */ private _initializeStream( backendStream: Duplex, + lifetime: Deferred, requestTag: string, request?: {} ): Promise { @@ -1202,19 +1205,22 @@ export class Firestore { } } + backendStream.on('data', () => streamReady()); + function streamEnded() { logger( 'Firestore._initializeStream', requestTag, 'Received stream end' ); - streamReady(); resultStream.unpipe(backendStream); + resolve(resultStream); + lifetime.resolve(); } - backendStream.on('data', () => streamReady()); backendStream.on('end', () => streamEnded()); backendStream.on('close', () => streamEnded()); + backendStream.on('finish', () => streamEnded()); backendStream.on('error', err => { if (!streamInitialized) { @@ -1240,7 +1246,7 @@ export class Firestore { // allows the caller to attach an error handler. setImmediate(() => { resultStream.emit('error', err); - }, 0); + }); } }); @@ -1326,58 +1332,54 @@ export class Firestore { ): Promise { const callOptions = this.createCallOptions(); - const bidrectional = methodName === 'listen'; - const result = new Deferred(); + const bidirectional = methodName === 'listen'; - this._clientPool.run(requestTag, gapicClient => { - // While we return the stream to the callee early, we don't want to - // release the GAPIC client until the callee has finished processing the - // stream. - const lifetime = new Deferred(); + return this._retry(methodName, requestTag, () => { + const result = new Deferred(); - this._retry(methodName, requestTag, async () => { + this._clientPool.run(requestTag, async gapicClient => { logger( 'Firestore.requestStream', requestTag, 'Sending request: %j', request ); - const stream = bidrectional - ? gapicClient[methodName](callOptions) - : gapicClient[methodName](request, callOptions); - const logStream = through2.obj(function(this, chunk, enc, callback) { - logger( - 'Firestore.requestStream', + try { + const stream = bidirectional + ? gapicClient[methodName](callOptions) + : gapicClient[methodName](request, callOptions); + const logStream = through2.obj(function(this, chunk, enc, callback) { + logger( + 'Firestore.requestStream', + requestTag, + 'Received response: %j', + chunk + ); + callback(); + }); + stream.pipe(logStream); + + const lifetime = new Deferred(); + const resultStream = await this._initializeStream( + stream, + lifetime, requestTag, - 'Received response: %j', - chunk + bidirectional ? request : undefined ); - callback(); - }); - - stream.pipe(logStream); - stream.on('close', lifetime.resolve); - stream.on('end', lifetime.resolve); - stream.on('finish', lifetime.resolve); - stream.on('error', lifetime.resolve); - - const resultStream = await this._initializeStream( - stream, - requestTag, - bidrectional ? request : undefined - ); - - resultStream.on('end', () => stream.end()); - result.resolve(resultStream); - }).catch(err => { - lifetime.resolve(); - result.reject(err); + resultStream.on('end', () => stream.end()); + result.resolve(resultStream); + + // While we return the stream to the callee early, we don't want to + // release the GAPIC client until the callee has finished processing the + // stream. + return lifetime.promise; + } catch (e) { + result.reject(e); + } }); - return lifetime.promise; + return result.promise; }); - - return result.promise; } } diff --git a/dev/test/index.ts b/dev/test/index.ts index 73e1bb2af..2e7343f17 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -960,8 +960,11 @@ describe('getAll() method', () => { }); it('handles stream exception during initialization', () => { + let attempts = 0; + const overrides: ApiOverride = { batchGetDocuments: () => { + ++attempts; return stream(new Error('Expected exception')); }, }; @@ -973,14 +976,18 @@ describe('getAll() method', () => { throw new Error('Unexpected success in Promise'); }) .catch(err => { + expect(attempts).to.equal(5); expect(err.message).to.equal('Expected exception'); }); }); }); it('handles stream exception after initialization', () => { + let attempts = 0; + const overrides: ApiOverride = { batchGetDocuments: () => { + ++attempts; return stream(found('documentId'), new Error('Expected exception')); }, }; @@ -992,13 +999,16 @@ describe('getAll() method', () => { throw new Error('Unexpected success in Promise'); }) .catch(err => { + // We don't retry since the stream might have already been released + // to the end user. + expect(attempts).to.equal(1); expect(err.message).to.equal('Expected exception'); }); }); }); it('handles intermittent stream exception', () => { - let attempts = 1; + let attempts = 0; const overrides: ApiOverride = { batchGetDocuments: () => { diff --git a/dev/test/pool.ts b/dev/test/pool.ts index dfa658e6f..623af579d 100644 --- a/dev/test/pool.ts +++ b/dev/test/pool.ts @@ -319,12 +319,12 @@ describe('Client pool', () => { return clientPool .terminate() .then(() => { - clientPool.run(REQUEST_TAG, () => + return clientPool.run(REQUEST_TAG, () => Promise.reject('Call to run() should have failed') ); }) .catch((err: Error) => { - expect(err.message).to.equal('The client has already been terminated'); + expect(err).to.equal('The client has already been terminated'); }); }); });