Skip to content

Commit

Permalink
fix: support Query.stream() as first client operation (#971)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Mar 16, 2020
1 parent 469df02 commit a48017c
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 28 deletions.
47 changes: 21 additions & 26 deletions dev/src/reference.ts
Expand Up @@ -1787,14 +1787,12 @@ export class Query<T = DocumentData> {
* @param {bytes=} transactionId A transaction ID.
*/
_get(transactionId?: Uint8Array): Promise<QuerySnapshot<T>> {
const request = this.toProto(transactionId);

const docs: Array<QueryDocumentSnapshot<T>> = [];

return new Promise((resolve, reject) => {
let readTime: Timestamp;

this._stream(request)
this._stream(transactionId)
.on('error', err => {
reject(err);
})
Expand Down Expand Up @@ -1858,8 +1856,7 @@ export class Query<T = DocumentData> {
);
}

const request = this.toProto();
const responseStream = this._stream(request);
const responseStream = this._stream();

const transform = through2.obj(function(this, chunk, encoding, callback) {
// Only send chunks with documents.
Expand Down Expand Up @@ -1987,13 +1984,13 @@ export class Query<T = DocumentData> {
}

/**
* Internal streaming method that accepts the request proto.
* Internal streaming method that accepts an optional transaction ID.
*
* @param request The request proto.
* @param transactionId A transaction ID.
* @private
* @returns A stream of document results.
*/
_stream(request: api.IRunQueryRequest): NodeJS.ReadableStream {
_stream(transactionId?: Uint8Array): NodeJS.ReadableStream {
const tag = requestTag();
const self = this;

Expand All @@ -2020,26 +2017,24 @@ export class Query<T = DocumentData> {
callback();
});

this.firestore.initializeIfNeeded(tag).then(() => {
this._firestore
.requestStream('runQuery', request, tag)
.then(backendStream => {
backendStream.on('error', err => {
logger(
'Query._stream',
tag,
'Query failed with stream error:',
err
);
stream.destroy(err);
});
backendStream.resume();
backendStream.pipe(stream);
})
.catch(err => {
this.firestore
.initializeIfNeeded(tag)
.then(async () => {
// `toProto()` might throw an exception. We rely on the behavior of an
// async function to convert this exception into the rejected Promise we
// catch below.
const request = this.toProto(transactionId);
return this._firestore.requestStream('runQuery', request, tag);
})
.then(backendStream => {
backendStream.on('error', err => {
logger('Query._stream', tag, 'Query failed with stream error:', err);
stream.destroy(err);
});
});
backendStream.resume();
backendStream.pipe(stream);
})
.catch(e => stream.destroy(e));

return stream;
}
Expand Down
81 changes: 81 additions & 0 deletions dev/system-test/firestore.ts
Expand Up @@ -2232,3 +2232,84 @@ describe('QuerySnapshot class', () => {
});
});
});

describe('Client initialization', () => {
const ops: Array<[
string,
(coll: CollectionReference) => Promise<unknown>
]> = [
['CollectionReference.get()', randomColl => randomColl.get()],
['CollectionReference.add()', randomColl => randomColl.add({})],
[
'CollectionReference.stream()',
randomColl => {
const deferred = new Deferred<void>();
randomColl.stream().on('finish', () => {
deferred.resolve();
});
return deferred.promise;
},
],
[
'CollectionReference.listDocuments()',
randomColl => randomColl.listDocuments(),
],
[
'CollectionReference.onSnapshot()',
randomColl => {
const deferred = new Deferred<void>();
const unsubscribe = randomColl.onSnapshot(() => {
unsubscribe();
deferred.resolve();
});
return deferred.promise;
},
],
['DocumentReference.get()', randomColl => randomColl.doc().get()],
['DocumentReference.create()', randomColl => randomColl.doc().create({})],
['DocumentReference.set()', randomColl => randomColl.doc().set({})],
[
'DocumentReference.update()',
async randomColl => {
const update = randomColl.doc().update('foo', 'bar');
await expect(update).to.eventually.be.rejectedWith(
'No document to update'
);
},
],
['DocumentReference.delete()', randomColl => randomColl.doc().delete()],
[
'DocumentReference.listCollections()',
randomColl => randomColl.doc().listCollections(),
],
[
'DocumentReference.onSnapshot()',
randomColl => {
const deferred = new Deferred<void>();
const unsubscribe = randomColl.doc().onSnapshot(() => {
unsubscribe();
deferred.resolve();
});
return deferred.promise;
},
],
[
'Firestore.runTransaction()',
randomColl => randomColl.firestore.runTransaction(t => t.get(randomColl)),
],
[
'Firestore.getAll()',
randomColl => randomColl.firestore.getAll(randomColl.doc()),
],
['Firestore.batch()', randomColl => randomColl.firestore.batch().commit()],
['Firestore.terminate()', randomColl => randomColl.firestore.terminate()],
];

for (const [description, op] of ops) {
it(`succeeds for ${description}`, () => {
const firestore = new Firestore();
const randomCol = getTestRoot(firestore);
return op(randomCol);
});
}
});
8 changes: 6 additions & 2 deletions dev/test/query.ts
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import {expect} from 'chai';
import {expect, use} from 'chai';
import * as chaiAsPromised from 'chai-as-promised';
import * as extend from 'extend';

import {google} from '../protos/firestore_v1_proto_api';
Expand Down Expand Up @@ -43,6 +44,8 @@ const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`;
// Change the argument to 'console.log' to enable debug output.
setLogFunction(() => {});

use(chaiAsPromised);

function snapshot(
relativePath: string,
data: DocumentData
Expand Down Expand Up @@ -1277,7 +1280,8 @@ describe('limitToLast() interface', () => {

it('requires at least one ordering constraints', () => {
const query = firestore.collection('collectionId');
expect(() => query.limitToLast(1).get()).to.throw(
const result = query.limitToLast(1).get();
return expect(result).to.eventually.be.rejectedWith(
'limitToLast() queries require specifying at least one orderBy() clause.'
);
});
Expand Down

0 comments on commit a48017c

Please sign in to comment.