Skip to content

Commit

Permalink
feat: add Symbol.asyncInterator to Query.stream() (#843)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Dec 31, 2019
1 parent b94c367 commit 68795c4
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 194 deletions.
20 changes: 17 additions & 3 deletions dev/src/external-modules.d.ts
@@ -1,4 +1,18 @@
// TODO(mrschmidt): Come up with actual definitions for these modules.
/*!
* Copyright 2019 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

declare module 'bun';
declare module 'functional-red-black-tree'
// TODO(mrschmidt): Come up with actual definitions for these modules.
declare module 'functional-red-black-tree'
260 changes: 78 additions & 182 deletions dev/src/index.ts
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

import * as bun from 'bun';
import {CallOptions} from 'google-gax';
import {Duplex, PassThrough} from 'stream';
import * as through2 from 'through2';
import {URL} from 'url';

Expand Down Expand Up @@ -950,7 +950,7 @@ export class Firestore {
const self = this;

return self
.readStream('batchGetDocuments', request, requestTag, true)
.requestStream('batchGetDocuments', 'unidirectional', request, requestTag)
.then(stream => {
return new Promise<DocumentSnapshot[]>((resolve, reject) => {
stream
Expand Down Expand Up @@ -1166,144 +1166,100 @@ export class Firestore {
}

/**
* Opens the provided stream and waits for it to become healthy. If an error
* occurs before the first byte is read, the method rejects the returned
* Promise.
* Waits for the provided stream to become active and returns a paused but
* healthy stream. If an error occurs before the first byte is read, the
* method rejects the returned Promise.
*
* @private
* @param resultStream The Node stream to monitor.
* @param backendStream The Node stream to monitor.
* @param requestTag A unique client-assigned identifier for this request.
* @param request If specified, the request that should be written to the
* stream after it opened.
* @returns The given Stream once it is considered healthy.
* stream after opening.
* @returns A guaranteed healthy stream that should be used instead of
* `backendStream`.
*/
private _initializeStream(
resultStream: NodeJS.ReadableStream,
requestTag: string
): Promise<void>;
private _initializeStream(
resultStream: NodeJS.ReadWriteStream,
requestTag: string,
request: {}
): Promise<void>;
private _initializeStream(
resultStream: NodeJS.ReadableStream | NodeJS.ReadWriteStream,
backendStream: Duplex,
requestTag: string,
request?: {}
): Promise<void> {
/** The last error we received and have not forwarded yet. */
let errorReceived: Error | null = null;
): Promise<Duplex> {
const resultStream = new PassThrough({objectMode: true});
resultStream.pause();

/**
* Whether we have resolved the Promise and returned the stream to the
* caller.
*/
let streamInitialized = false;

/**
* Whether the stream end has been reached. This has to be forwarded to the
* caller..
*/
let endCalled = false;

return new Promise((resolve, reject) => {
const streamReady = () => {
if (errorReceived) {
logger(
'Firestore._initializeStream',
requestTag,
'Emit error:',
errorReceived
);
resultStream.emit('error', errorReceived);
errorReceived = null;
} else if (!streamInitialized) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
return new Promise<Duplex>((resolve, reject) => {
function streamReady() {
if (!streamInitialized) {
streamInitialized = true;
resultStream.pause();

// Calling 'stream.pause()' only holds up 'data' events and not the
// 'end' event we intend to forward here. We therefore need to wait
// until the API consumer registers their listeners (in the .then()
// call) before emitting any further events.
resolve();

// We execute the forwarding of the 'end' event via setTimeout() as
// V8 guarantees that the above the Promise chain is resolved before
// any calls invoked via setTimeout().
setTimeout(() => {
if (endCalled) {
logger(
'Firestore._initializeStream',
requestTag,
'Forwarding stream close'
);
resultStream.emit('end');
}
}, 0);
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
resolve(resultStream);
}
};

// We capture any errors received and buffer them until the caller has
// registered a listener. We register our event handler as early as
// possible to avoid the default stream behavior (which is just to log and
// continue).
resultStream.on('readable', () => {
streamReady();
});
}

resultStream.on('end', () => {
function streamEnded() {
logger(
'Firestore._initializeStream',
requestTag,
'Received stream end'
);
endCalled = true;
streamReady();
});
resultStream.unpipe(backendStream);
}

resultStream.on('error', err => {
logger(
'Firestore._initializeStream',
requestTag,
'Received stream error:',
err
);
// If we receive an error before we were able to receive any data,
// reject this stream.
backendStream.on('data', () => streamReady());
backendStream.on('end', () => streamEnded());
backendStream.on('close', () => streamEnded());

backendStream.on('error', err => {
if (!streamInitialized) {
// If we receive an error before we were able to receive any data,
// reject this stream.
logger(
'Firestore._initializeStream',
requestTag,
'Received initial error:',
err
);
streamInitialized = true;
reject(err);
} else {
errorReceived = err;
logger(
'Firestore._initializeStream',
requestTag,
'Received stream error:',
err
);
// We execute the forwarding of the 'error' event via setImmediate() as
// V8 guarantees that the Promise chain returned from this method
// is resolved before any code executed via setImmediate(). This
// allows the caller to attach an error handler.
setImmediate(() => {
resultStream.emit('error', err);
}, 0);
}
});

backendStream.pipe(resultStream);

if (request) {
logger(
'Firestore._initializeStream',
requestTag,
'Sending request: %j',
request
);
(resultStream as NodeJS.WritableStream)
// The stream returned by the Gapic library accepts Protobuf
// messages, but the type information does not expose this.
// tslint:disable-next-line no-any
.write(request as any, 'utf-8', () => {
logger(
'Firestore._initializeStream',
requestTag,
'Marking stream as healthy'
);
streamReady();
});
backendStream.write(request, 'utf-8', () => {
logger(
'Firestore._initializeStream',
requestTag,
'Marking stream as healthy'
);
streamReady();
});
}
});
}
Expand Down Expand Up @@ -1363,130 +1319,70 @@ export class Firestore {
}

/**
* A funnel for read-only streaming API requests, assigning a project ID where
* necessary within the request options.
* A funnel for streaming API requests, assigning a project ID where necessary
* within the request options.
*
* The stream is returned in paused state and needs to be resumed once all
* listeners are attached.
*
* @private
* @param methodName Name of the streaming Veneer API endpoint that
* takes a request and GAX options.
* @param mode Whether this a unidirectional or bidirectional call.
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @param allowRetries Whether this is an idempotent request that can be
* retried.
* @returns A Promise with the resulting read-only stream.
*/
readStream(
requestStream(
methodName: string,
mode: 'unidirectional' | 'bidirectional',
request: {},
requestTag: string,
allowRetries: boolean
): Promise<NodeJS.ReadableStream> {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
requestTag: string
): Promise<Duplex> {
const callOptions = this.createCallOptions();

const result = new Deferred<NodeJS.ReadableStream>();
const result = new Deferred<Duplex>();

this._clientPool.run(requestTag, gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
this._retry(MAX_REQUEST_RETRIES, requestTag, async () => {
logger(
'Firestore.readStream',
'Firestore.requestStream',
requestTag,
'Sending request: %j',
request
);
const stream = gapicClient[methodName](request, callOptions);
const stream: Duplex =
mode === 'unidirectional'
? gapicClient[methodName](request, callOptions)
: gapicClient[methodName](callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
'Firestore.requestStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});

const resultStream = bun([stream, logStream]);
resultStream.on('close', lifetime.resolve);
resultStream.on('end', lifetime.resolve);
resultStream.on('error', lifetime.resolve);

await this._initializeStream(resultStream, requestTag);
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}
stream.pipe(logStream);
stream.on('close', lifetime.resolve);
stream.on('end', lifetime.resolve);
stream.on('finish', lifetime.resolve);
stream.on('error', lifetime.resolve);

/**
* A funnel for read-write streaming API requests, assigning a project ID
* where necessary for all writes.
*
* The stream is returned in paused state and needs to be resumed once all
* listeners are attached.
*
* @private
* @param methodName Name of the streaming Veneer API endpoint that takes
* GAX options.
* @param request The Protobuf request to send as the first stream message.
* @param requestTag A unique client-assigned identifier for this request.
* @param allowRetries Whether this is an idempotent request that can be
* retried.
* @returns A Promise with the resulting read/write stream.
*/
readWriteStream(
methodName: string,
request: {},
requestTag: string,
allowRetries: boolean
): Promise<NodeJS.ReadWriteStream> {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const result = new Deferred<NodeJS.ReadWriteStream>();

this._clientPool.run(requestTag, gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});

const resultStream = bun([requestStream, logStream]);
resultStream.on('close', lifetime.resolve);
resultStream.on('finish', lifetime.resolve);
resultStream.on('end', lifetime.resolve);
resultStream.on('error', lifetime.resolve);
const resultStream = await this._initializeStream(
stream,
requestTag,
mode === 'bidirectional' ? request : undefined
);

await this._initializeStream(resultStream, requestTag, request);
resultStream.on('end', () => stream.end());
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
Expand Down

0 comments on commit 68795c4

Please sign in to comment.