Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Previously _initializeStream returned a Promise that indicated that the
stream was "released", i.e. that it was was ready for attaching
listeners.

#256 Added pooled clients and changed the
callers of _initializeStream to reuse this promise such that when it was
resolved, the stream could be returned to the pool. This works when
listeners are short-lived, but fails when listeners run indefinitely.

This change arranges to release the clients back to the pool only after
the stream has completed, which allows an arbitrary number of indefinite
listens to run without problems.
  • Loading branch information
wilhuff committed May 10, 2019
1 parent 257ce2a commit be865dc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 43 deletions.
93 changes: 50 additions & 43 deletions dev/src/index.ts
Expand Up @@ -1011,12 +1011,13 @@ export class Firestore {
* @returns The given Stream once it is considered healthy.
*/
private _initializeStream(
resultStream: NodeJS.ReadableStream,
releaser: () => void, resultStream: NodeJS.ReadableStream,
requestTag: string): Promise<NodeJS.ReadableStream>;
private _initializeStream(
resultStream: NodeJS.ReadWriteStream, requestTag: string,
request: {}): Promise<NodeJS.ReadWriteStream>;
releaser: () => void, resultStream: NodeJS.ReadWriteStream,
requestTag: string, request: {}): Promise<NodeJS.ReadWriteStream>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream|NodeJS.ReadWriteStream,
requestTag: string,
request?: {}): Promise<NodeJS.ReadableStream|NodeJS.ReadWriteStream> {
Expand All @@ -1042,6 +1043,7 @@ export class Firestore {
'Firestore._initializeStream', requestTag,
'Emit error:', errorReceived);
resultStream.emit('error', errorReceived);
releaser();
errorReceived = null;
} else if (!streamInitialized) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
Expand All @@ -1063,6 +1065,7 @@ export class Firestore {
'Firestore._initializeStream', requestTag,
'Forwarding stream close');
resultStream.emit('end');
releaser();
}
}, 0);
}
Expand Down Expand Up @@ -1095,6 +1098,7 @@ export class Firestore {
'Received initial error:', err);
streamInitialized = true;
reject(err);
releaser();
} else {
errorReceived = err;
}
Expand Down Expand Up @@ -1182,32 +1186,33 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
logger(
'Firestore.readStream', requestTag,
'Sending request: %j', request);
const stream = gapicClient[methodName](request, callOptions);
const logStream =
through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger(
'Firestore.readStream', requestTag,
'Received error:', err);
reject(err);
}
})
.then(stream => this._initializeStream(stream, requestTag));
});
const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);

return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
logger(
'Firestore.readStream', requestTag, 'Sending request: %j',
request);
const stream = gapicClient[methodName](request, callOptions);
const logStream =
through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger(
'Firestore.readStream', requestTag,
'Received error:', err);
reject(err);
}
})
.then(stream => this._initializeStream(releaser, stream, requestTag));
});
}

Expand All @@ -1233,23 +1238,25 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);
const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);

const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(resultStream, requestTag, request);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream', requestTag, 'Received response: %j',
chunk);
this.push(chunk);
callback();
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(
releaser, resultStream, requestTag, request);
});
});
}
Expand Down
20 changes: 20 additions & 0 deletions dev/src/pool.ts
Expand Up @@ -89,6 +89,26 @@ export class ClientPool<T> {
}
}

/**
* Creates a new function that will release the given client, when called.
*
* This guarantees that the given client can only be released once.
*
* @private
*/
createReleaser(client: T): () => void {
// Unfortunately, once the release() call is disconnected from the Promise
// returned from _initializeStream, there's no single callback in which the
// releaser can be guaranteed to be called once.
let released = false;
return () => {
if (!released) {
released = true;
this.release(client);
}
};
}

/**
* The number of currently registered clients.
*
Expand Down

0 comments on commit be865dc

Please sign in to comment.