From 68795c43ae9ef6b286650228746c7c16f59347f7 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Mon, 30 Dec 2019 19:58:17 -0800 Subject: [PATCH] feat: add Symbol.asyncInterator to Query.stream() (#843) --- dev/src/external-modules.d.ts | 20 ++- dev/src/index.ts | 260 ++++++++++------------------------ dev/src/reference.ts | 7 +- dev/src/watch.ts | 9 +- dev/system-test/firestore.ts | 16 ++- dev/test/watch.ts | 2 + package.json | 2 +- 7 files changed, 122 insertions(+), 194 deletions(-) diff --git a/dev/src/external-modules.d.ts b/dev/src/external-modules.d.ts index 40761eba0..2747e5615 100644 --- a/dev/src/external-modules.d.ts +++ b/dev/src/external-modules.d.ts @@ -1,4 +1,18 @@ -// TODO(mrschmidt): Come up with actual definitions for these modules. +/*! + * Copyright 2019 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -declare module 'bun'; -declare module 'functional-red-black-tree' \ No newline at end of file +// TODO(mrschmidt): Come up with actual definitions for these modules. +declare module 'functional-red-black-tree' diff --git a/dev/src/index.ts b/dev/src/index.ts index 2f0d7e430..5211ae25f 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -14,8 +14,8 @@ * limitations under the License. */ -import * as bun from 'bun'; import {CallOptions} from 'google-gax'; +import {Duplex, PassThrough} from 'stream'; import * as through2 from 'through2'; import {URL} from 'url'; @@ -950,7 +950,7 @@ export class Firestore { const self = this; return self - .readStream('batchGetDocuments', request, requestTag, true) + .requestStream('batchGetDocuments', 'unidirectional', request, requestTag) .then(stream => { return new Promise((resolve, reject) => { stream @@ -1166,33 +1166,25 @@ export class Firestore { } /** - * Opens the provided stream and waits for it to become healthy. If an error - * occurs before the first byte is read, the method rejects the returned - * Promise. + * Waits for the provided stream to become active and returns a paused but + * healthy stream. If an error occurs before the first byte is read, the + * method rejects the returned Promise. * * @private - * @param resultStream The Node stream to monitor. + * @param backendStream The Node stream to monitor. * @param requestTag A unique client-assigned identifier for this request. * @param request If specified, the request that should be written to the - * stream after it opened. - * @returns The given Stream once it is considered healthy. + * stream after opening. + * @returns A guaranteed healthy stream that should be used instead of + * `backendStream`. */ private _initializeStream( - resultStream: NodeJS.ReadableStream, - requestTag: string - ): Promise; - private _initializeStream( - resultStream: NodeJS.ReadWriteStream, - requestTag: string, - request: {} - ): Promise; - private _initializeStream( - resultStream: NodeJS.ReadableStream | NodeJS.ReadWriteStream, + backendStream: Duplex, requestTag: string, request?: {} - ): Promise { - /** The last error we received and have not forwarded yet. */ - let errorReceived: Error | null = null; + ): Promise { + const resultStream = new PassThrough({objectMode: true}); + resultStream.pause(); /** * Whether we have resolved the Promise and returned the stream to the @@ -1200,91 +1192,59 @@ export class Firestore { */ let streamInitialized = false; - /** - * Whether the stream end has been reached. This has to be forwarded to the - * caller.. - */ - let endCalled = false; - - return new Promise((resolve, reject) => { - const streamReady = () => { - if (errorReceived) { - logger( - 'Firestore._initializeStream', - requestTag, - 'Emit error:', - errorReceived - ); - resultStream.emit('error', errorReceived); - errorReceived = null; - } else if (!streamInitialized) { - logger('Firestore._initializeStream', requestTag, 'Releasing stream'); + return new Promise((resolve, reject) => { + function streamReady() { + if (!streamInitialized) { streamInitialized = true; - resultStream.pause(); - - // Calling 'stream.pause()' only holds up 'data' events and not the - // 'end' event we intend to forward here. We therefore need to wait - // until the API consumer registers their listeners (in the .then() - // call) before emitting any further events. - resolve(); - - // We execute the forwarding of the 'end' event via setTimeout() as - // V8 guarantees that the above the Promise chain is resolved before - // any calls invoked via setTimeout(). - setTimeout(() => { - if (endCalled) { - logger( - 'Firestore._initializeStream', - requestTag, - 'Forwarding stream close' - ); - resultStream.emit('end'); - } - }, 0); + logger('Firestore._initializeStream', requestTag, 'Releasing stream'); + resolve(resultStream); } - }; - - // We capture any errors received and buffer them until the caller has - // registered a listener. We register our event handler as early as - // possible to avoid the default stream behavior (which is just to log and - // continue). - resultStream.on('readable', () => { - streamReady(); - }); + } - resultStream.on('end', () => { + function streamEnded() { logger( 'Firestore._initializeStream', requestTag, 'Received stream end' ); - endCalled = true; streamReady(); - }); + resultStream.unpipe(backendStream); + } - resultStream.on('error', err => { - logger( - 'Firestore._initializeStream', - requestTag, - 'Received stream error:', - err - ); - // If we receive an error before we were able to receive any data, - // reject this stream. + backendStream.on('data', () => streamReady()); + backendStream.on('end', () => streamEnded()); + backendStream.on('close', () => streamEnded()); + + backendStream.on('error', err => { if (!streamInitialized) { + // If we receive an error before we were able to receive any data, + // reject this stream. logger( 'Firestore._initializeStream', requestTag, 'Received initial error:', err ); - streamInitialized = true; reject(err); } else { - errorReceived = err; + logger( + 'Firestore._initializeStream', + requestTag, + 'Received stream error:', + err + ); + // We execute the forwarding of the 'error' event via setImmediate() as + // V8 guarantees that the Promise chain returned from this method + // is resolved before any code executed via setImmediate(). This + // allows the caller to attach an error handler. + setImmediate(() => { + resultStream.emit('error', err); + }, 0); } }); + backendStream.pipe(resultStream); + if (request) { logger( 'Firestore._initializeStream', @@ -1292,18 +1252,14 @@ export class Firestore { 'Sending request: %j', request ); - (resultStream as NodeJS.WritableStream) - // The stream returned by the Gapic library accepts Protobuf - // messages, but the type information does not expose this. - // tslint:disable-next-line no-any - .write(request as any, 'utf-8', () => { - logger( - 'Firestore._initializeStream', - requestTag, - 'Marking stream as healthy' - ); - streamReady(); - }); + backendStream.write(request, 'utf-8', () => { + logger( + 'Firestore._initializeStream', + requestTag, + 'Marking stream as healthy' + ); + streamReady(); + }); } }); } @@ -1363,8 +1319,8 @@ export class Firestore { } /** - * A funnel for read-only streaming API requests, assigning a project ID where - * necessary within the request options. + * A funnel for streaming API requests, assigning a project ID where necessary + * within the request options. * * The stream is returned in paused state and needs to be resumed once all * listeners are attached. @@ -1372,22 +1328,20 @@ export class Firestore { * @private * @param methodName Name of the streaming Veneer API endpoint that * takes a request and GAX options. + * @param mode Whether this a unidirectional or bidirectional call. * @param request The Protobuf request to send. * @param requestTag A unique client-assigned identifier for this request. - * @param allowRetries Whether this is an idempotent request that can be - * retried. * @returns A Promise with the resulting read-only stream. */ - readStream( + requestStream( methodName: string, + mode: 'unidirectional' | 'bidirectional', request: {}, - requestTag: string, - allowRetries: boolean - ): Promise { - const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; + requestTag: string + ): Promise { const callOptions = this.createCallOptions(); - const result = new Deferred(); + const result = new Deferred(); this._clientPool.run(requestTag, gapicClient => { // While we return the stream to the callee early, we don't want to @@ -1395,98 +1349,40 @@ export class Firestore { // stream. const lifetime = new Deferred(); - this._retry(attempts, requestTag, async () => { + this._retry(MAX_REQUEST_RETRIES, requestTag, async () => { logger( - 'Firestore.readStream', + 'Firestore.requestStream', requestTag, 'Sending request: %j', request ); - const stream = gapicClient[methodName](request, callOptions); + const stream: Duplex = + mode === 'unidirectional' + ? gapicClient[methodName](request, callOptions) + : gapicClient[methodName](callOptions); const logStream = through2.obj(function(this, chunk, enc, callback) { logger( - 'Firestore.readStream', + 'Firestore.requestStream', requestTag, 'Received response: %j', chunk ); - this.push(chunk); callback(); }); - const resultStream = bun([stream, logStream]); - resultStream.on('close', lifetime.resolve); - resultStream.on('end', lifetime.resolve); - resultStream.on('error', lifetime.resolve); - - await this._initializeStream(resultStream, requestTag); - result.resolve(resultStream); - }).catch(err => { - lifetime.resolve(); - result.reject(err); - }); - - return lifetime.promise; - }); - - return result.promise; - } + stream.pipe(logStream); + stream.on('close', lifetime.resolve); + stream.on('end', lifetime.resolve); + stream.on('finish', lifetime.resolve); + stream.on('error', lifetime.resolve); - /** - * A funnel for read-write streaming API requests, assigning a project ID - * where necessary for all writes. - * - * The stream is returned in paused state and needs to be resumed once all - * listeners are attached. - * - * @private - * @param methodName Name of the streaming Veneer API endpoint that takes - * GAX options. - * @param request The Protobuf request to send as the first stream message. - * @param requestTag A unique client-assigned identifier for this request. - * @param allowRetries Whether this is an idempotent request that can be - * retried. - * @returns A Promise with the resulting read/write stream. - */ - readWriteStream( - methodName: string, - request: {}, - requestTag: string, - allowRetries: boolean - ): Promise { - const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; - const callOptions = this.createCallOptions(); - - const result = new Deferred(); - - this._clientPool.run(requestTag, gapicClient => { - // While we return the stream to the callee early, we don't want to - // release the GAPIC client until the callee has finished processing the - // stream. - const lifetime = new Deferred(); - - this._retry(attempts, requestTag, async () => { - logger('Firestore.readWriteStream', requestTag, 'Opening stream'); - const requestStream = gapicClient[methodName](callOptions); - - const logStream = through2.obj(function(this, chunk, enc, callback) { - logger( - 'Firestore.readWriteStream', - requestTag, - 'Received response: %j', - chunk - ); - this.push(chunk); - callback(); - }); - - const resultStream = bun([requestStream, logStream]); - resultStream.on('close', lifetime.resolve); - resultStream.on('finish', lifetime.resolve); - resultStream.on('end', lifetime.resolve); - resultStream.on('error', lifetime.resolve); + const resultStream = await this._initializeStream( + stream, + requestTag, + mode === 'bidirectional' ? request : undefined + ); - await this._initializeStream(resultStream, requestTag, request); + resultStream.on('end', () => stream.end()); result.resolve(resultStream); }).catch(err => { lifetime.resolve(); diff --git a/dev/src/reference.ts b/dev/src/reference.ts index 913b9505f..ae98085e3 100644 --- a/dev/src/reference.ts +++ b/dev/src/reference.ts @@ -16,7 +16,6 @@ const deepEqual = require('deep-equal'); -import * as bun from 'bun'; import * as through2 from 'through2'; import * as proto from '../protos/firestore_v1_proto_api'; @@ -1719,7 +1718,9 @@ export class Query { callback(); }); - return bun([responseStream, transform]); + responseStream.pipe(transform); + responseStream.on('error', transform.destroy); + return transform; } /** @@ -1833,7 +1834,7 @@ export class Query { this.firestore.initializeIfNeeded(tag).then(() => { const request = this.toProto(transactionId); this._firestore - .readStream('runQuery', request, tag, true) + .requestStream('runQuery', 'unidirectional', request, tag) .then(backendStream => { backendStream.on('error', err => { logger( diff --git a/dev/src/watch.ts b/dev/src/watch.ts index d6434a23e..9f4692852 100644 --- a/dev/src/watch.ts +++ b/dev/src/watch.ts @@ -17,6 +17,7 @@ import * as assert from 'assert'; import * as rbtree from 'functional-red-black-tree'; import {describe, it} from 'mocha'; +import {Duplex} from 'stream'; import {google} from '../protos/firestore_v1_proto_api'; import {ExponentialBackoff} from './backoff'; @@ -245,7 +246,7 @@ abstract class Watch { * The current stream to the backend. * @private */ - private currentStream: NodeJS.ReadWriteStream | null = null; + private currentStream: Duplex | null = null; /** * The server assigns and updates the resume token. @@ -359,7 +360,7 @@ abstract class Watch { this.initStream(); return () => { - logger('Watch.onSnapshot', this.requestTag, 'Ending stream'); + logger('Watch.onSnapshot', this.requestTag, 'Unsubscribe called'); // Prevent further callbacks. this.isActive = false; this.onNext = () => {}; @@ -505,7 +506,7 @@ abstract class Watch { // Note that we need to call the internal _listen API to pass additional // header values in readWriteStream. return this.firestore - .readWriteStream('listen', request, this.requestTag, true) + .requestStream('listen', 'bidirectional', request, this.requestTag) .then(backendStream => { if (!this.isActive) { logger( @@ -513,7 +514,7 @@ abstract class Watch { this.requestTag, 'Closing inactive stream' ); - backendStream.end(); + backendStream.emit('end'); return; } logger('Watch.initStream', this.requestTag, 'Opened new stream'); diff --git a/dev/system-test/firestore.ts b/dev/system-test/firestore.ts index e5b45af62..9d2fe9d2d 100644 --- a/dev/system-test/firestore.ts +++ b/dev/system-test/firestore.ts @@ -1345,7 +1345,8 @@ describe('Query class', () => { Promise.all([ref1.set({foo: 'a'}), ref2.set({foo: 'b'})]).then(() => { return randomCol .stream() - .on('data', () => { + .on('data', d => { + expect(d).to.be.an.instanceOf(DocumentSnapshot); ++received; }) .on('end', () => { @@ -1355,6 +1356,19 @@ describe('Query class', () => { }); }); + it('stream() supports readable[Symbol.asyncIterator]()', async () => { + let received = 0; + await randomCol.doc().set({foo: 'bar'}); + await randomCol.doc().set({foo: 'bar'}); + + const stream = randomCol.stream(); + for await (const chunk of stream) { + ++received; + } + + expect(received).to.equal(2); + }); + it('can query collection groups', async () => { // Use `randomCol` to get a random collection group name to use but ensure // it starts with 'b' for predictable ordering. diff --git a/dev/test/watch.ts b/dev/test/watch.ts index bc2c0792e..5a5dbbc6e 100644 --- a/dev/test/watch.ts +++ b/dev/test/watch.ts @@ -807,9 +807,11 @@ describe('Query watch', () => { watchHelper.sendSnapshot(1, Buffer.from([0xabcd])); return watchHelper.await('snapshot').then(async () => { streamHelper.close(); + await streamHelper.await('end'); await streamHelper.awaitOpen(); streamHelper.close(); + await streamHelper.await('end'); await streamHelper.awaitOpen(); expect(streamHelper.streamCount).to.equal(3); diff --git a/package.json b/package.json index 7f49dc447..05e7fef74 100644 --- a/package.json +++ b/package.json @@ -47,10 +47,10 @@ "predocs-test": "npm run docs" }, "dependencies": { - "bun": "^0.0.12", "deep-equal": "^2.0.0", "functional-red-black-tree": "^1.0.1", "google-gax": "^1.12.0", + "readable-stream": "^3.4.0", "through2": "^3.0.0" }, "devDependencies": {