Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix client pooling for long-lived listens #614

Merged
merged 3 commits into from May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 59 additions & 52 deletions dev/src/index.ts
Expand Up @@ -1011,12 +1011,13 @@ export class Firestore {
* @returns The given Stream once it is considered healthy.
*/
private _initializeStream(
resultStream: NodeJS.ReadableStream,
releaser: () => void, resultStream: NodeJS.ReadableStream,
requestTag: string): Promise<NodeJS.ReadableStream>;
private _initializeStream(
resultStream: NodeJS.ReadWriteStream, requestTag: string,
request: {}): Promise<NodeJS.ReadWriteStream>;
releaser: () => void, resultStream: NodeJS.ReadWriteStream,
requestTag: string, request: {}): Promise<NodeJS.ReadWriteStream>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream|NodeJS.ReadWriteStream,
requestTag: string,
request?: {}): Promise<NodeJS.ReadableStream|NodeJS.ReadWriteStream> {
Expand All @@ -1027,7 +1028,7 @@ export class Firestore {
* Whether we have resolved the Promise and returned the stream to the
* caller.
*/
let streamReleased = false;
let streamInitialized = false;

/**
* Whether the stream end has been reached. This has to be forwarded to the
Expand All @@ -1036,16 +1037,17 @@ export class Firestore {
let endCalled = false;

return new Promise((resolve, reject) => {
const releaseStream = () => {
const streamReady = () => {
if (errorReceived) {
logger(
'Firestore._initializeStream', requestTag,
'Emit error:', errorReceived);
resultStream.emit('error', errorReceived);
releaser();
errorReceived = null;
} else if (!streamReleased) {
} else if (!streamInitialized) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
streamReleased = true;
streamInitialized = true;
resultStream.pause();

// Calling 'stream.pause()' only holds up 'data' events and not the
Expand All @@ -1063,6 +1065,7 @@ export class Firestore {
'Firestore._initializeStream', requestTag,
'Forwarding stream close');
resultStream.emit('end');
releaser();
}
}, 0);
}
Expand All @@ -1073,14 +1076,14 @@ export class Firestore {
// possible to avoid the default stream behavior (which is just to log and
// continue).
resultStream.on('readable', () => {
releaseStream();
streamReady();
});

resultStream.on('end', () => {
logger(
'Firestore._initializeStream', requestTag, 'Received stream end');
endCalled = true;
releaseStream();
streamReady();
});

resultStream.on('error', err => {
Expand All @@ -1089,12 +1092,13 @@ export class Firestore {
'Received stream error:', err);
// If we receive an error before we were able to receive any data,
// reject this stream.
if (!streamReleased) {
if (!streamInitialized) {
logger(
'Firestore._initializeStream', requestTag,
'Received initial error:', err);
streamReleased = true;
streamInitialized = true;
reject(err);
releaser();
} else {
errorReceived = err;
}
Expand All @@ -1112,7 +1116,7 @@ export class Firestore {
logger(
'Firestore._initializeStream', requestTag,
'Marking stream as healthy');
releaseStream();
streamReady();
});
}
});
Expand Down Expand Up @@ -1182,32 +1186,33 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
logger(
'Firestore.readStream', requestTag,
'Sending request: %j', request);
const stream = gapicClient[methodName](request, callOptions);
const logStream =
through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger(
'Firestore.readStream', requestTag,
'Received error:', err);
reject(err);
}
})
.then(stream => this._initializeStream(stream, requestTag));
});
const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);

return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
logger(
'Firestore.readStream', requestTag, 'Sending request: %j',
request);
const stream = gapicClient[methodName](request, callOptions);
const logStream =
through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger(
'Firestore.readStream', requestTag,
'Received error:', err);
reject(err);
}
})
.then(stream => this._initializeStream(releaser, stream, requestTag));
});
}

Expand All @@ -1233,23 +1238,25 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);
const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);

const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(resultStream, requestTag, request);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream', requestTag, 'Received response: %j',
chunk);
this.push(chunk);
callback();
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(
releaser, resultStream, requestTag, request);
});
});
}
Expand Down
24 changes: 22 additions & 2 deletions dev/src/pool.ts
Expand Up @@ -49,7 +49,7 @@ export class ClientPool<T> {
*
* @private
*/
private acquire(): T {
acquire(): T {
let selectedClient: T|null = null;
let selectedRequestCount = 0;

Expand Down Expand Up @@ -77,7 +77,7 @@ export class ClientPool<T> {
* removing it from the pool of active clients.
* @private
*/
private release(client: T): void {
release(client: T): void {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

Expand All @@ -89,6 +89,26 @@ export class ClientPool<T> {
}
}

/**
* Creates a new function that will release the given client, when called.
*
* This guarantees that the given client can only be released once.
*
* @private
*/
createReleaser(client: T): () => void {
// Unfortunately, once the release() call is disconnected from the Promise
// returned from _initializeStream, there's no single callback in which the
// releaser can be guaranteed to be called once.
let released = false;
return () => {
if (!released) {
released = true;
this.release(client);
}
};
}

/**
* The number of currently registered clients.
*
Expand Down