diff --git a/src/resource-stream.ts b/src/resource-stream.ts index 77ee762..330948b 100644 --- a/src/resource-stream.ts +++ b/src/resource-stream.ts @@ -59,43 +59,50 @@ export class ResourceStream extends Transform implements ResourceEvents { this._reading = true; - this._requestFn( - this._nextQuery, - (err: Error | null, results: T[], nextQuery: {} | null) => { - if (err) { - this.destroy(err); - return; - } + // Wrap in a try/catch to catch input linting errors, e.g. + // an invalid BigQuery query. These errors are thrown in an + // async fashion, which makes them un-catchable by the user. + try { + this._requestFn( + this._nextQuery, + (err: Error | null, results: T[], nextQuery: {} | null) => { + if (err) { + this.destroy(err); + return; + } - this._nextQuery = nextQuery; + this._nextQuery = nextQuery; - if (this._resultsToSend !== Infinity) { - results = results.splice(0, this._resultsToSend); - this._resultsToSend -= results.length; - } + if (this._resultsToSend !== Infinity) { + results = results.splice(0, this._resultsToSend); + this._resultsToSend -= results.length; + } - let more = true; + let more = true; - for (const result of results) { - if (this._ended) { - break; + for (const result of results) { + if (this._ended) { + break; + } + more = this.push(result); } - more = this.push(result); - } - const isFinished = !this._nextQuery || this._resultsToSend < 1; - const madeMaxCalls = ++this._requestsMade >= this._maxApiCalls; + const isFinished = !this._nextQuery || this._resultsToSend < 1; + const madeMaxCalls = ++this._requestsMade >= this._maxApiCalls; - if (isFinished || madeMaxCalls) { - this.end(); - } + if (isFinished || madeMaxCalls) { + this.end(); + } - if (more && !this._ended) { - setImmediate(() => this._read()); - } + if (more && !this._ended) { + setImmediate(() => this._read()); + } - this._reading = false; - } - ); + this._reading = false; + } + ); + } catch (e) { + this.destroy(e); + } } } diff --git a/test/resource-stream.ts b/test/resource-stream.ts index 64d1790..27a47f9 100644 --- a/test/resource-stream.ts +++ b/test/resource-stream.ts @@ -280,5 +280,17 @@ describe('ResourceStream', () => { assert.strictEqual(stream._reading, false); }); + + it('should destroy the stream if the request method throws', done => { + const error = new Error('Error.'); + stream._requestFn = () => { + throw error; + }; + stream.on('error', err => { + assert.strictEqual(err, error); + done(); + }); + stream._read(); + }); }); });