Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support more than 100 long-lived streams #623

Merged
merged 10 commits into from May 16, 2019
118 changes: 68 additions & 50 deletions dev/src/index.ts
Expand Up @@ -58,6 +58,7 @@ import {
import {WriteBatch} from './write-batch';

import api = google.firestore.v1;
import {Deferred} from '../test/util/helpers';

export {
CollectionReference,
Expand Down Expand Up @@ -1144,18 +1145,15 @@ export class Firestore {
* @returns The given Stream once it is considered healthy.
*/
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream,
requestTag: string
): Promise<NodeJS.ReadableStream>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadWriteStream,
requestTag: string,
request: {}
): Promise<NodeJS.ReadWriteStream>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream | NodeJS.ReadWriteStream,
requestTag: string,
request?: {}
Expand All @@ -1167,7 +1165,7 @@ export class Firestore {
* Whether we have resolved the Promise and returned the stream to the
* caller.
*/
let streamInitialized = false;
let streamReleased = false;

/**
* Whether the stream end has been reached. This has to be forwarded to the
Expand All @@ -1176,7 +1174,7 @@ export class Firestore {
let endCalled = false;

return new Promise((resolve, reject) => {
const streamReady = () => {
const releaseStream = () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this name. "released" can be confused with releasing back into the pool. I certainly was confused that way when reading this.

This promise represents that the stream is ready for use or initialized or something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the PR to keep the renames of your original commit.

if (errorReceived) {
logger(
'Firestore._initializeStream',
Expand All @@ -1185,11 +1183,10 @@ export class Firestore {
errorReceived
);
resultStream.emit('error', errorReceived);
releaser();
errorReceived = null;
} else if (!streamInitialized) {
} else if (!streamReleased) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
streamInitialized = true;
streamReleased = true;
resultStream.pause();

// Calling 'stream.pause()' only holds up 'data' events and not the
Expand All @@ -1209,7 +1206,6 @@ export class Firestore {
'Forwarding stream close'
);
resultStream.emit('end');
releaser();
}
}, 0);
}
Expand All @@ -1220,7 +1216,7 @@ export class Firestore {
// possible to avoid the default stream behavior (which is just to log and
// continue).
resultStream.on('readable', () => {
streamReady();
releaseStream();
});

resultStream.on('end', () => {
Expand All @@ -1230,7 +1226,7 @@ export class Firestore {
'Received stream end'
);
endCalled = true;
streamReady();
releaseStream();
});

resultStream.on('error', err => {
Expand All @@ -1242,16 +1238,15 @@ export class Firestore {
);
// If we receive an error before we were able to receive any data,
// reject this stream.
if (!streamInitialized) {
if (!streamReleased) {
logger(
'Firestore._initializeStream',
requestTag,
'Received initial error:',
err
);
streamInitialized = true;
streamReleased = true;
reject(err);
releaser();
} else {
errorReceived = err;
}
Expand All @@ -1274,7 +1269,7 @@ export class Firestore {
requestTag,
'Marking stream as healthy'
);
streamReady();
releaseStream();
});
}
});
Expand Down Expand Up @@ -1359,36 +1354,48 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadableStream>();

return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
'Received response: %j',
chunk
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger('Firestore.readStream', requestTag, 'Received error:', err);
reject(err);
}
}).then(stream => this._initializeStream(releaser, stream, requestTag));
this.push(chunk);
callback();
});
const resultStream = bun([stream, logStream]);
return this._initializeStream(resultStream, requestTag);
})
.then(stream => {
stream.on('close', lifetime.resolve);
result.resolve(stream);
})
.catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}

/**
Expand Down Expand Up @@ -1416,11 +1423,15 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadWriteStream>();

this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
this._retry(attempts, requestTag, async () => {
wilhuff marked this conversation as resolved.
Show resolved Hide resolved
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

Expand All @@ -1436,14 +1447,21 @@ export class Firestore {
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(
releaser,
resultStream,
requestTag,
request
);
});
return this._initializeStream(resultStream, requestTag, request);
})
.then(stream => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async/await?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned this up. I also removed the return value from _initializeStream as part of this since we always return the unmodified resultStream.

stream.on('close', lifetime.resolve);
result.resolve(stream);
})
.catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}
}

Expand Down
24 changes: 2 additions & 22 deletions dev/src/pool.ts
Expand Up @@ -50,7 +50,7 @@ export class ClientPool<T> {
*
* @private
*/
acquire(): T {
private acquire(): T {
let selectedClient: T | null = null;
let selectedRequestCount = 0;

Expand Down Expand Up @@ -79,7 +79,7 @@ export class ClientPool<T> {
* removing it from the pool of active clients.
* @private
*/
release(client: T): void {
private release(client: T): void {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

Expand All @@ -91,26 +91,6 @@ export class ClientPool<T> {
}
}

/**
* Creates a new function that will release the given client, when called.
*
* This guarantees that the given client can only be released once.
*
* @private
*/
createReleaser(client: T): () => void {
// Unfortunately, once the release() call is disconnected from the Promise
// returned from _initializeStream, there's no single callback in which the
// releaser can be guaranteed to be called once.
let released = false;
return () => {
if (!released) {
released = true;
this.release(client);
}
};
}

/**
* The number of currently registered clients.
*
Expand Down
32 changes: 32 additions & 0 deletions dev/system-test/firestore.ts
Expand Up @@ -31,6 +31,7 @@ import {
Timestamp,
} from '../src';
import {autoId} from '../src/util';
import {Deferred} from '../test/util/helpers';

const version = require('../../package.json').version;

Expand Down Expand Up @@ -913,6 +914,37 @@ describe('DocumentReference class', () => {
maybeRun();
});
});

it('handles more than 100 concurrent listeners', async () => {
const ref = randomCol.doc('doc');

const emptyResults: Array<Deferred<void>> = [];
const documentResults: Array<Deferred<void>> = [];
const unsubscribeCallbacks: Array<() => void> = [];

// A single GAPIC client can only handle 100 concurrent streams. We set
// up 100+ long-lived listeners to verify that Firestore pools requests
// across multiple clients.
for (let i = 0; i < 150; ++i) {
emptyResults[i] = new Deferred<void>();
documentResults[i] = new Deferred<void>();

unsubscribeCallbacks[i] = randomCol
.where('i', '>', i)
.onSnapshot(snapshot => {
if (snapshot.size === 0) {
emptyResults[i].resolve();
} else if (snapshot.size === 1) {
documentResults[i].resolve();
}
});
}

await Promise.all(emptyResults.map(d => d.promise));
ref.set({i: 1337});
await Promise.all(documentResults.map(d => d.promise));
unsubscribeCallbacks.forEach(c => c());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test verifies that all 150 listeners succeed but doesn't verify that everything has been properly released to the pool.

Is it possible to check that pool.size is 150 once the listeners are started and then get back to zero after?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that this test can succeed even if you remove the line that resolves the lifetime promise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a "shutdown" block to each tests that verifies that the operation count goes back to zero. I had to change some of the unit tests to make this work.

});
});
});

Expand Down