Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: do not release client before retry (#870)
  • Loading branch information
schmidt-sebastian committed Jan 12, 2020
1 parent 69bd69a commit 47f7ab5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 45 deletions.
86 changes: 44 additions & 42 deletions dev/src/index.ts
Expand Up @@ -1173,6 +1173,8 @@ export class Firestore {
*
* @private
* @param backendStream The Node stream to monitor.
* @param lifetime A Promise that resolves when the stream receives an 'end',
* 'close' or 'finish' message.
* @param requestTag A unique client-assigned identifier for this request.
* @param request If specified, the request that should be written to the
* stream after opening.
Expand All @@ -1181,6 +1183,7 @@ export class Firestore {
*/
private _initializeStream(
backendStream: Duplex,
lifetime: Deferred<void>,
requestTag: string,
request?: {}
): Promise<Duplex> {
Expand All @@ -1202,19 +1205,22 @@ export class Firestore {
}
}

backendStream.on('data', () => streamReady());

function streamEnded() {
logger(
'Firestore._initializeStream',
requestTag,
'Received stream end'
);
streamReady();
resultStream.unpipe(backendStream);
resolve(resultStream);
lifetime.resolve();
}

backendStream.on('data', () => streamReady());
backendStream.on('end', () => streamEnded());
backendStream.on('close', () => streamEnded());
backendStream.on('finish', () => streamEnded());

backendStream.on('error', err => {
if (!streamInitialized) {
Expand All @@ -1240,7 +1246,7 @@ export class Firestore {
// allows the caller to attach an error handler.
setImmediate(() => {
resultStream.emit('error', err);
}, 0);
});
}
});

Expand Down Expand Up @@ -1326,58 +1332,54 @@ export class Firestore {
): Promise<Duplex> {
const callOptions = this.createCallOptions();

const bidrectional = methodName === 'listen';
const result = new Deferred<Duplex>();
const bidirectional = methodName === 'listen';

this._clientPool.run(requestTag, gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();
return this._retry(methodName, requestTag, () => {
const result = new Deferred<Duplex>();

this._retry(methodName, requestTag, async () => {
this._clientPool.run(requestTag, async gapicClient => {
logger(
'Firestore.requestStream',
requestTag,
'Sending request: %j',
request
);
const stream = bidrectional
? gapicClient[methodName](callOptions)
: gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.requestStream',
try {
const stream = bidirectional
? gapicClient[methodName](callOptions)
: gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.requestStream',
requestTag,
'Received response: %j',
chunk
);
callback();
});
stream.pipe(logStream);

const lifetime = new Deferred<void>();
const resultStream = await this._initializeStream(
stream,
lifetime,
requestTag,
'Received response: %j',
chunk
bidirectional ? request : undefined
);
callback();
});

stream.pipe(logStream);
stream.on('close', lifetime.resolve);
stream.on('end', lifetime.resolve);
stream.on('finish', lifetime.resolve);
stream.on('error', lifetime.resolve);

const resultStream = await this._initializeStream(
stream,
requestTag,
bidrectional ? request : undefined
);

resultStream.on('end', () => stream.end());
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
resultStream.on('end', () => stream.end());
result.resolve(resultStream);

// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
return lifetime.promise;
} catch (e) {
result.reject(e);
}
});

return lifetime.promise;
return result.promise;
});

return result.promise;
}
}

Expand Down
12 changes: 11 additions & 1 deletion dev/test/index.ts
Expand Up @@ -960,8 +960,11 @@ describe('getAll() method', () => {
});

it('handles stream exception during initialization', () => {
let attempts = 0;

const overrides: ApiOverride = {
batchGetDocuments: () => {
++attempts;
return stream(new Error('Expected exception'));
},
};
Expand All @@ -973,14 +976,18 @@ describe('getAll() method', () => {
throw new Error('Unexpected success in Promise');
})
.catch(err => {
expect(attempts).to.equal(5);
expect(err.message).to.equal('Expected exception');
});
});
});

it('handles stream exception after initialization', () => {
let attempts = 0;

const overrides: ApiOverride = {
batchGetDocuments: () => {
++attempts;
return stream(found('documentId'), new Error('Expected exception'));
},
};
Expand All @@ -992,13 +999,16 @@ describe('getAll() method', () => {
throw new Error('Unexpected success in Promise');
})
.catch(err => {
// We don't retry since the stream might have already been released
// to the end user.
expect(attempts).to.equal(1);
expect(err.message).to.equal('Expected exception');
});
});
});

it('handles intermittent stream exception', () => {
let attempts = 1;
let attempts = 0;

const overrides: ApiOverride = {
batchGetDocuments: () => {
Expand Down
4 changes: 2 additions & 2 deletions dev/test/pool.ts
Expand Up @@ -319,12 +319,12 @@ describe('Client pool', () => {
return clientPool
.terminate()
.then(() => {
clientPool.run(REQUEST_TAG, () =>
return clientPool.run(REQUEST_TAG, () =>
Promise.reject('Call to run() should have failed')
);
})
.catch((err: Error) => {
expect(err.message).to.equal('The client has already been terminated');
expect(err).to.equal('The client has already been terminated');
});
});
});

0 comments on commit 47f7ab5

Please sign in to comment.