Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support more than 100 long-lived streams #623

Merged
merged 10 commits into from May 16, 2019
103 changes: 59 additions & 44 deletions dev/src/index.ts
Expand Up @@ -58,6 +58,7 @@ import {
import {WriteBatch} from './write-batch';

import api = google.firestore.v1;
import {Deferred} from '../test/util/helpers';

export {
CollectionReference,
Expand Down Expand Up @@ -1144,22 +1145,19 @@ export class Firestore {
* @returns The given Stream once it is considered healthy.
*/
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream,
requestTag: string
): Promise<NodeJS.ReadableStream>;
): Promise<void>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadWriteStream,
requestTag: string,
request: {}
): Promise<NodeJS.ReadWriteStream>;
): Promise<void>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream | NodeJS.ReadWriteStream,
requestTag: string,
request?: {}
): Promise<NodeJS.ReadableStream | NodeJS.ReadWriteStream> {
): Promise<void> {
/** The last error we received and have not forwarded yet. */
let errorReceived: Error | null = null;

Expand All @@ -1185,7 +1183,6 @@ export class Firestore {
errorReceived
);
resultStream.emit('error', errorReceived);
releaser();
errorReceived = null;
} else if (!streamInitialized) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
Expand All @@ -1196,7 +1193,7 @@ export class Firestore {
// 'end' event we intend to forward here. We therefore need to wait
// until the API consumer registers their listeners (in the .then()
// call) before emitting any further events.
resolve(resultStream);
resolve();

// We execute the forwarding of the 'end' event via setTimeout() as
// V8 guarantees that the above the Promise chain is resolved before
Expand All @@ -1209,7 +1206,6 @@ export class Firestore {
'Forwarding stream close'
);
resultStream.emit('end');
releaser();
}
}, 0);
}
Expand Down Expand Up @@ -1251,7 +1247,6 @@ export class Firestore {
);
streamInitialized = true;
reject(err);
releaser();
} else {
errorReceived = err;
}
Expand Down Expand Up @@ -1359,36 +1354,47 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadableStream>();

return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
'Received response: %j',
chunk
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger('Firestore.readStream', requestTag, 'Received error:', err);
reject(err);
}
}).then(stream => this._initializeStream(releaser, stream, requestTag));
this.push(chunk);
callback();
});

const resultStream = bun([stream, logStream]);
resultStream.on('close', lifetime.resolve);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are streams guaranteed to emit the close event? What happens in the case of an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://nodejs.org/api/stream.html, yes:

"A Writable stream will always emit the 'close' event if it is created with the emitClose option."
"A Readable stream will always emit the 'close' event if it is created with the emitClose option."

emitClose defaults to true.

I originally trusted this, but I spent more time and added test asserts. It turns out that the close event is not always emitted. To make the unit and system tests pass, I also have to wait for error/end and finish on writeable streams.


await this._initializeStream(resultStream, requestTag);
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}

/**
Expand Down Expand Up @@ -1416,11 +1422,15 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadWriteStream>();

this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
this._retry(attempts, requestTag, async () => {
wilhuff marked this conversation as resolved.
Show resolved Hide resolved
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

Expand All @@ -1436,14 +1446,19 @@ export class Firestore {
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(
releaser,
resultStream,
requestTag,
request
);
resultStream.on('close', lifetime.resolve);
await this._initializeStream(resultStream, requestTag, request);

result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}
}

Expand Down
24 changes: 2 additions & 22 deletions dev/src/pool.ts
Expand Up @@ -50,7 +50,7 @@ export class ClientPool<T> {
*
* @private
*/
acquire(): T {
private acquire(): T {
let selectedClient: T | null = null;
let selectedRequestCount = 0;

Expand Down Expand Up @@ -79,7 +79,7 @@ export class ClientPool<T> {
* removing it from the pool of active clients.
* @private
*/
release(client: T): void {
private release(client: T): void {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

Expand All @@ -91,26 +91,6 @@ export class ClientPool<T> {
}
}

/**
* Creates a new function that will release the given client, when called.
*
* This guarantees that the given client can only be released once.
*
* @private
*/
createReleaser(client: T): () => void {
// Unfortunately, once the release() call is disconnected from the Promise
// returned from _initializeStream, there's no single callback in which the
// releaser can be guaranteed to be called once.
let released = false;
return () => {
if (!released) {
released = true;
this.release(client);
}
};
}

/**
* The number of currently registered clients.
*
Expand Down
32 changes: 32 additions & 0 deletions dev/system-test/firestore.ts
Expand Up @@ -31,6 +31,7 @@ import {
Timestamp,
} from '../src';
import {autoId} from '../src/util';
import {Deferred} from '../test/util/helpers';

const version = require('../../package.json').version;

Expand Down Expand Up @@ -913,6 +914,37 @@ describe('DocumentReference class', () => {
maybeRun();
});
});

it('handles more than 100 concurrent listeners', async () => {
const ref = randomCol.doc('doc');

const emptyResults: Array<Deferred<void>> = [];
const documentResults: Array<Deferred<void>> = [];
const unsubscribeCallbacks: Array<() => void> = [];

// A single GAPIC client can only handle 100 concurrent streams. We set
// up 100+ long-lived listeners to verify that Firestore pools requests
// across multiple clients.
for (let i = 0; i < 150; ++i) {
emptyResults[i] = new Deferred<void>();
documentResults[i] = new Deferred<void>();

unsubscribeCallbacks[i] = randomCol
.where('i', '>', i)
.onSnapshot(snapshot => {
if (snapshot.size === 0) {
emptyResults[i].resolve();
} else if (snapshot.size === 1) {
documentResults[i].resolve();
}
});
}

await Promise.all(emptyResults.map(d => d.promise));
ref.set({i: 1337});
await Promise.all(documentResults.map(d => d.promise));
unsubscribeCallbacks.forEach(c => c());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test verifies that all 150 listeners succeed but doesn't verify that everything has been properly released to the pool.

Is it possible to check that pool.size is 150 once the listeners are started and then get back to zero after?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that this test can succeed even if you remove the line that resolves the lifetime promise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a "shutdown" block to each tests that verifies that the operation count goes back to zero. I had to change some of the unit tests to make this work.

});
});
});

Expand Down