From 88ff2d0d8e0fc25a4219ef5625b8de353ed4aa29 Mon Sep 17 00:00:00 2001 From: Andrew Zammit Date: Tue, 12 May 2020 18:35:09 -0700 Subject: [PATCH] feat: warn on too many concurrent requests (#165) * feat: warn on too many concurrent requests * fix: decrease concurrent request count on error too * fix: do not use scientific notation, link to multipart GH issue for skipped tests * feat: expose teeny stats, allow resetting stats, allow reading copy of options * chore: remove unused proxyquire dev dep --- src/TeenyStatistics.ts | 199 ++++++++++++++++++++++++ src/index.ts | 21 +++ test/TeenyStatistics.ts | 327 ++++++++++++++++++++++++++++++++++++++++ test/index.ts | 142 ++++++++++++++++- 4 files changed, 688 insertions(+), 1 deletion(-) create mode 100644 src/TeenyStatistics.ts create mode 100644 test/TeenyStatistics.ts diff --git a/src/TeenyStatistics.ts b/src/TeenyStatistics.ts new file mode 100644 index 0000000..8d89712 --- /dev/null +++ b/src/TeenyStatistics.ts @@ -0,0 +1,199 @@ +/*! + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export interface TeenyStatisticsOptions { + /** + * A positive number representing when to issue a warning about the number + * of concurrent requests using teeny-request. + * Set to 0 to disable this warning. + * Corresponds to the TEENY_REQUEST_WARN_CONCURRENT_REQUESTS environment + * variable. + */ + concurrentRequests?: number; +} + +type TeenyStatisticsConfig = Required; + +/** + * TeenyStatisticsCounters is distinct from TeenyStatisticsOptions: + * Used when dumping current counters and other internal metrics. + */ +export interface TeenyStatisticsCounters { + concurrentRequests: number; +} + +/** + * @class TeenyStatisticsWarning + * @extends Error + * @description While an error, is used for emitting warnings when + * meeting certain configured thresholds. + * @see process.emitWarning + */ +export class TeenyStatisticsWarning extends Error { + static readonly CONCURRENT_REQUESTS = 'ConcurrentRequestsExceededWarning'; + + public threshold = 0; + public type = ''; + public value = 0; + + /** + * @param {string} message + */ + constructor(message: string) { + super(message); + this.name = this.constructor.name; + Error.captureStackTrace(this, this.constructor); + } +} + +/** + * @class TeenyStatistics + * @description Maintain various statistics internal to teeny-request. Tracking + * is not automatic and must be instrumented within teeny-request. + */ +export class TeenyStatistics { + /** + * @description A default threshold representing when to warn about excessive + * in-flight/concurrent requests. + * @type {number} + * @static + * @readonly + * @default 5000 + */ + static readonly DEFAULT_WARN_CONCURRENT_REQUESTS = 5000; + + /** + * @type {TeenyStatisticsConfig} + * @private + */ + private _options: TeenyStatisticsConfig; + + /** + * @type {number} + * @private + * @default 0 + */ + private _concurrentRequests = 0; + + /** + * @type {boolean} + * @private + * @default false + */ + private _didConcurrentRequestWarn = false; + + /** + * @param {TeenyStatisticsOptions} [opts] + */ + constructor(opts?: TeenyStatisticsOptions) { + this._options = TeenyStatistics._prepareOptions(opts); + } + + /** + * Returns a copy of the current options. + * @return {TeenyStatisticsOptions} + */ + getOptions(): TeenyStatisticsOptions { + return Object.assign({}, this._options); + } + + /** + * Change configured statistics options. This will not preserve unspecified + * options that were previously specified, i.e. this is a reset of options. + * @param {TeenyStatisticsOptions} [opts] + * @returns {TeenyStatisticsConfig} The previous options. + * @see _prepareOptions + */ + setOptions(opts?: TeenyStatisticsOptions): TeenyStatisticsConfig { + const oldOpts = this._options; + this._options = TeenyStatistics._prepareOptions(opts); + return oldOpts; + } + + /** + * @readonly + * @return {TeenyStatisticsCounters} + */ + get counters(): TeenyStatisticsCounters { + return { + concurrentRequests: this._concurrentRequests, + }; + } + + /** + * @description Should call this right before making a request. + */ + requestStarting(): void { + this._concurrentRequests++; + + if ( + this._options.concurrentRequests > 0 && + this._concurrentRequests >= this._options.concurrentRequests && + !this._didConcurrentRequestWarn + ) { + this._didConcurrentRequestWarn = true; + const warning = new TeenyStatisticsWarning( + 'Possible excessive concurrent requests detected. ' + + this._concurrentRequests + + ' requests in-flight, which exceeds the configured threshold of ' + + this._options.concurrentRequests + + '. Use the TEENY_REQUEST_WARN_CONCURRENT_REQUESTS environment ' + + 'variable or the concurrentRequests option of teeny-request to ' + + 'increase or disable (0) this warning.' + ); + warning.type = TeenyStatisticsWarning.CONCURRENT_REQUESTS; + warning.value = this._concurrentRequests; + warning.threshold = this._options.concurrentRequests; + process.emitWarning(warning); + } + } + + /** + * @description When using `requestStarting`, call this after the request + * has finished. + */ + requestFinished() { + // TODO negative? + this._concurrentRequests--; + } + + /** + * Configuration Precedence: + * 1. Dependency inversion via defined option. + * 2. Global numeric environment variable. + * 3. Built-in default. + * This will not preserve unspecified options previously specified. + * @param {TeenyStatisticsOptions} [opts] + * @returns {TeenyStatisticsOptions} + * @private + */ + private static _prepareOptions({ + concurrentRequests: diConcurrentRequests, + }: TeenyStatisticsOptions = {}): TeenyStatisticsConfig { + let concurrentRequests = this.DEFAULT_WARN_CONCURRENT_REQUESTS; + + const envConcurrentRequests = Number( + process.env.TEENY_REQUEST_WARN_CONCURRENT_REQUESTS + ); + if (diConcurrentRequests !== undefined) { + concurrentRequests = diConcurrentRequests; + } else if (!Number.isNaN(envConcurrentRequests)) { + concurrentRequests = envConcurrentRequests; + } + + return {concurrentRequests}; + } +} diff --git a/src/index.ts b/src/index.ts index 62a41cb..f981952 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,6 +20,7 @@ import fetch, * as f from 'node-fetch'; import {PassThrough, Readable} from 'stream'; import * as uuid from 'uuid'; import {getAgent} from './agents'; +import {TeenyStatistics} from './TeenyStatistics'; // eslint-disable-next-line @typescript-eslint/no-var-requires const streamEvents = require('stream-events'); @@ -206,8 +207,10 @@ function teenyRequest( options.body = createMultipartStream(boundary, multipart); // Multipart upload + teenyRequest.stats.requestStarting(); fetch(uri, options).then( res => { + teenyRequest.stats.requestFinished(); const header = res.headers.get('content-type'); const response = fetchToRequestResponse(options, res); const body = response.body; @@ -238,6 +241,7 @@ function teenyRequest( ); }, err => { + teenyRequest.stats.requestFinished(); callback(err, null!, null); } ); @@ -259,8 +263,11 @@ function teenyRequest( } }); options.compress = false; + + teenyRequest.stats.requestStarting(); fetch(uri, options).then( res => { + teenyRequest.stats.requestFinished(); responseStream = res.body; responseStream.on('error', (err: Error) => { @@ -271,6 +278,7 @@ function teenyRequest( requestStream.emit('response', response); }, err => { + teenyRequest.stats.requestFinished(); requestStream.emit('error', err); } ); @@ -280,9 +288,12 @@ function teenyRequest( // stream. return requestStream as Request; } + // GET or POST with callback + teenyRequest.stats.requestStarting(); fetch(uri, options).then( res => { + teenyRequest.stats.requestFinished(); const header = res.headers.get('content-type'); const response = fetchToRequestResponse(options, res); const body = response.body; @@ -319,6 +330,7 @@ function teenyRequest( ); }, err => { + teenyRequest.stats.requestFinished(); callback(err, null!, null); } ); @@ -335,4 +347,13 @@ teenyRequest.defaults = (defaults: CoreOptions) => { }; }; +/** + * Single instance of an interface for keeping track of things. + */ +teenyRequest.stats = new TeenyStatistics(); + +teenyRequest.resetStats = (): void => { + teenyRequest.stats = new TeenyStatistics(teenyRequest.stats.getOptions()); +}; + export {teenyRequest}; diff --git a/test/TeenyStatistics.ts b/test/TeenyStatistics.ts new file mode 100644 index 0000000..d6c4c45 --- /dev/null +++ b/test/TeenyStatistics.ts @@ -0,0 +1,327 @@ +/*! + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {afterEach, before, beforeEach, describe, it} from 'mocha'; +import * as sinon from 'sinon'; +import { + TeenyStatistics, + TeenyStatisticsOptions, + TeenyStatisticsWarning, +} from '../src/TeenyStatistics'; + +function hooksForEnvCleanupInThisContext() { + const prevEnvMap: Map = new Map([ + ['TEENY_REQUEST_WARN_CONCURRENT_REQUESTS', undefined], + ]); + + before(() => { + prevEnvMap.forEach((v, k, map) => { + map.set(k, process.env[k]); + }); + }); + + afterEach(() => { + prevEnvMap.forEach((v, k) => { + if (v === undefined) { + delete process.env[k]; + return; + } + + process.env[k] = v; + }); + }); +} + +describe('TeenyStatistics', () => { + const sandbox = sinon.createSandbox(); + let emitWarnStub: sinon.SinonStub; + + beforeEach(() => { + emitWarnStub = sandbox.stub(process, 'emitWarning'); + }); + + afterEach(() => { + sandbox.restore(); + }); + + describe('constructor', () => { + hooksForEnvCleanupInThisContext(); + + it('should have default concurrent requests', () => { + assert.strictEqual( + TeenyStatistics.DEFAULT_WARN_CONCURRENT_REQUESTS, + 5000 + ); + }); + + it('should use predefined options by default', () => { + const t = new TeenyStatistics(); + assert.deepStrictEqual(t['_options'], {concurrentRequests: 5e3}); + }); + + it('should allow constructor override', () => { + const opts: TeenyStatisticsOptions = {concurrentRequests: 99}; + const t = new TeenyStatistics(Object.assign({}, opts)); + assert.deepStrictEqual(t['_options'], opts); + }); + + it('should allow env var override', () => { + process.env.TEENY_REQUEST_WARN_CONCURRENT_REQUESTS = '42'; + const t = new TeenyStatistics(); + assert.deepStrictEqual(t['_options'], {concurrentRequests: 42}); + }); + + it('should prefer constructor over env var override', () => { + process.env.TEENY_REQUEST_WARN_CONCURRENT_REQUESTS = '123'; + const opts: TeenyStatisticsOptions = {concurrentRequests: 321}; + const t = new TeenyStatistics(Object.assign({}, opts)); + assert.deepStrictEqual(t['_options'], opts); + }); + }); + + describe('getOptions', () => { + it('should return the options, including defaults', () => { + const t = new TeenyStatistics(); + assert.deepStrictEqual(t.getOptions(), { + concurrentRequests: TeenyStatistics.DEFAULT_WARN_CONCURRENT_REQUESTS, + }); + }); + + it('should return the non-default options', () => { + const opts1: TeenyStatisticsOptions = {concurrentRequests: 123}; + const t = new TeenyStatistics(Object.assign({}, opts1)); + assert.deepStrictEqual(t.getOptions(), opts1); + }); + + it('should return a copy of the options', () => { + const t = new TeenyStatistics(); + assert.notStrictEqual(t.getOptions(), t['_options']); + }); + }); + + describe('setOptions', () => { + hooksForEnvCleanupInThisContext(); + + it('should be optional and set to defaults', () => { + const opts1: TeenyStatisticsOptions = {concurrentRequests: 123}; + const t = new TeenyStatistics(Object.assign({}, opts1)); + t.setOptions(); + assert.deepStrictEqual(t['_options'], {concurrentRequests: 5e3}); + }); + + it('should override previously set using options', () => { + const opts1: TeenyStatisticsOptions = {concurrentRequests: 123}; + const opts2: TeenyStatisticsOptions = {concurrentRequests: 321}; + const t = new TeenyStatistics(Object.assign({}, opts1)); + t.setOptions(Object.assign({}, opts2)); + assert.deepStrictEqual(t['_options'], opts2); + }); + + it('should override previously set using env var', () => { + const opts1: TeenyStatisticsOptions = {concurrentRequests: 123}; + const t = new TeenyStatistics(Object.assign({}, opts1)); + assert.deepStrictEqual(t['_options'], {concurrentRequests: 123}); + process.env.TEENY_REQUEST_WARN_CONCURRENT_REQUESTS = '999'; + t.setOptions(); + assert.deepStrictEqual(t['_options'], {concurrentRequests: 999}); + }); + + it('should return old options', () => { + const opts1: TeenyStatisticsOptions = {concurrentRequests: 123}; + const opts2: TeenyStatisticsOptions = {concurrentRequests: 321}; + const t = new TeenyStatistics(Object.assign({}, opts1)); + const oldOpts = t.setOptions(Object.assign({}, opts2)); + assert.deepStrictEqual(oldOpts, opts1); + }); + }); + + describe('counters', () => { + it('should return counters', () => { + const t = new TeenyStatistics(); + assert.deepStrictEqual(t.counters, {concurrentRequests: 0}); + }); + + it('should be read-only', () => { + const t = new TeenyStatistics(); + assert.throws(() => { + // eslint-disable-next-line @typescript-eslint/ban-ts-ignore + // @ts-ignore + t.counters = {concurrentRequests: 99}; + }); + }); + }); + + describe('request concurrency', () => { + let t: TeenyStatistics; + beforeEach(() => { + t = new TeenyStatistics(); + }); + + it('should increment concurrency count', () => { + let numExpected = 0; + assert.strictEqual(t.counters.concurrentRequests, numExpected); + + t.requestStarting(); + numExpected++; + assert.strictEqual(t.counters.concurrentRequests, numExpected); + + t.requestStarting(); + numExpected++; + assert.strictEqual(t.counters.concurrentRequests, numExpected); + + t.requestStarting(); + numExpected++; + assert.strictEqual(t.counters.concurrentRequests, numExpected); + + for (let i = 0; i < 100; i++) { + t.requestStarting(); + numExpected++; + } + assert.strictEqual(t.counters.concurrentRequests, numExpected); + }); + + it('should decrement concurrency count', () => { + let numExpected = 0; + assert.strictEqual(t.counters.concurrentRequests, 0); + + for (let i = 0; i < 100; i++) { + t.requestStarting(); + numExpected++; + } + + t.requestFinished(); + numExpected--; + assert.strictEqual(t.counters.concurrentRequests, numExpected); + + t.requestFinished(); + numExpected--; + assert.strictEqual(t.counters.concurrentRequests, numExpected); + + t.requestFinished(); + numExpected--; + assert.strictEqual(t.counters.concurrentRequests, numExpected); + + for (let i = numExpected; i > 0; i--) { + t.requestFinished(); + numExpected--; + } + assert.strictEqual(t.counters.concurrentRequests, 0); + }); + + it('should emit a warning upon reaching threshold', () => { + for (let i = 0; i < 5e3 - 1; i++) { + t.requestStarting(); + } + assert(emitWarnStub.notCalled); + + t.requestStarting(); + assert( + emitWarnStub.calledOnceWith( + sinon.match.instanceOf(TeenyStatisticsWarning) + ) + ); + }); + + it('should not re-emit once emitted', () => { + for (let i = 0; i < 5e3 - 1; i++) { + t.requestStarting(); + } + assert(emitWarnStub.notCalled); + + // first time emitting + t.requestStarting(); + assert( + emitWarnStub.calledOnceWith( + sinon.match.instanceOf(TeenyStatisticsWarning) + ) + ); + + // shouldn't emit on the next call (i.e. still greater than threshold) + t.requestStarting(); + assert(emitWarnStub.calledOnce); + + // shouldn't emit after twice the threshold (possible bad math/logic) + for (let i = 0; i < 5e3; i++) { + t.requestStarting(); + } + assert(emitWarnStub.calledOnce); + }); + + it('should not re-emit when yoyoing threshold', () => { + for (let i = 0; i < 5e3 - 1; i++) { + t.requestStarting(); + } + assert(emitWarnStub.notCalled); + + // first time emitting + t.requestStarting(); + assert( + emitWarnStub.calledOnceWith( + sinon.match.instanceOf(TeenyStatisticsWarning) + ) + ); + + // let's bring the counter back down + for (let i = 5e3; i >= 0; i--) { + t.requestFinished(); + } + + // and bring it back again surpassing the threshold + for (let i = 0; i < 5e3 * 2; i++) { + t.requestStarting(); + } + assert(emitWarnStub.calledOnce); + }); + + it('should emit a TeenyStatisticsWarning', () => { + for (let i = 0; i < 5e3; i++) { + t.requestStarting(); + } + assert(emitWarnStub.calledOnce); + + const warning = emitWarnStub.firstCall.args[0] as TeenyStatisticsWarning; + assert.strictEqual(warning.threshold, 5e3); + assert.strictEqual(warning.value, 5e3); + assert.strictEqual( + warning.type, + TeenyStatisticsWarning.CONCURRENT_REQUESTS + ); + }); + + it('should emit a helpful message', () => { + for (let i = 0; i < 5e3; i++) { + t.requestStarting(); + } + assert(emitWarnStub.calledOnce); + + const errStr: string = emitWarnStub.firstCall.args[0].toString(); + assert( + errStr.includes('Possible excessive concurrent requests detected.'), + 'describes the nature of the warning' + ); + assert( + errStr.includes('TEENY_REQUEST_WARN_CONCURRENT_REQUESTS'), + 'mentions env var' + ); + assert( + errStr.includes('concurrentRequests'), + 'mentions concurrentRequests option' + ); + assert(errStr.search(/\b0\b/) !== -1, 'mentions 0'); + }); + }); +}); diff --git a/test/index.ts b/test/index.ts index bad2f46..2819ae9 100644 --- a/test/index.ts +++ b/test/index.ts @@ -15,11 +15,12 @@ */ import * as assert from 'assert'; -import {describe, it, afterEach} from 'mocha'; +import {describe, it, afterEach, beforeEach} from 'mocha'; import * as nock from 'nock'; import {Readable, PassThrough} from 'stream'; import * as sinon from 'sinon'; import {teenyRequest} from '../src'; +import {TeenyStatistics, TeenyStatisticsWarning} from '../src/TeenyStatistics'; import {pool} from '../src/agents'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -34,11 +35,32 @@ function mockJson() { return nock(uri).get('/').reply(200, {hello: '🌍'}); } +function mockError() { + return nock(uri).get('/').replyWithError('mock err'); +} + describe('teeny', () => { const sandbox = sinon.createSandbox(); + let emitWarnStub: sinon.SinonStub; + let statsStub: sinon.SinonStubbedInstance; + + beforeEach(() => { + emitWarnStub = sandbox.stub(process, 'emitWarning'); + + // don't mask other process warns + emitWarnStub + .callThrough() + .withArgs(sinon.match.instanceOf(TeenyStatisticsWarning)) + .callsFake(() => {}); + + // note: this stubs the already instantiated TeenyStatistics + statsStub = sandbox.stub(teenyRequest.stats); + }); + afterEach(() => { pool.clear(); sandbox.restore(); + teenyRequest.resetStats(); nock.cleanAll(); }); @@ -249,4 +271,122 @@ describe('teeny', () => { }); }); }); + + it('should expose TeenyStatistics instance', () => { + assert.ok(teenyRequest.stats instanceof TeenyStatistics); + }); + + it('should allow resetting statistics', () => { + const oldStats = teenyRequest.stats; + teenyRequest.resetStats(); + assert.notStrictEqual(teenyRequest.stats, oldStats); + assert.ok(teenyRequest.stats instanceof TeenyStatistics); + }); + + it('should keep the original stats options when resetting', () => { + statsStub.getOptions.restore(); + statsStub.setOptions.restore(); + teenyRequest.stats.setOptions({concurrentRequests: 42}); + teenyRequest.resetStats(); + const newOptions = teenyRequest.stats.getOptions(); + assert.deepStrictEqual(newOptions, {concurrentRequests: 42}); + }); + + it('should emit warning on too many concurrent requests', done => { + statsStub.setOptions.restore(); + statsStub.requestStarting.restore(); + teenyRequest.stats.setOptions({concurrentRequests: 1}); + + const scope = mockJson(); + teenyRequest({uri}, () => { + assert.ok(emitWarnStub.calledOnce); + scope.done(); + done(); + }); + }); + + it('should track stats, callback mode, success', done => { + const scope = mockJson(); + teenyRequest({uri}, () => { + assert.ok(statsStub.requestStarting.calledOnceWithExactly()); + assert.ok(statsStub.requestFinished.calledOnceWithExactly()); + scope.done(); + done(); + }); + }); + + it('should track stats, callback mode, failure', done => { + const scope = mockError(); + teenyRequest({uri}, err => { + assert.ok(err); + assert.ok(statsStub.requestStarting.calledOnceWithExactly()); + assert.ok(statsStub.requestFinished.calledOnceWithExactly()); + scope.done(); + done(); + }); + }); + + it('should track stats, stream mode, success', done => { + const scope = mockJson(); + const readable = teenyRequest({uri}); + assert.ok(statsStub.requestStarting.calledOnceWithExactly()); + + readable.once('response', () => { + assert.ok(statsStub.requestFinished.calledOnceWithExactly()); + scope.done(); + done(); + }); + }); + + it('should track stats, stream mode, failure', done => { + const scope = mockError(); + const readable = teenyRequest({uri}); + assert.ok(statsStub.requestStarting.calledOnceWithExactly()); + + readable.once('error', err => { + assert.ok(err); + assert.ok(statsStub.requestFinished.calledOnceWithExactly()); + scope.done(); + done(); + }); + }); + + // TODO multipart is broken with 2 strings + // see: https://github.com/googleapis/teeny-request/issues/168 + it.skip('should track stats, multipart mode, success', done => { + const scope = mockJson(); + teenyRequest( + { + method: 'POST', + headers: {}, + multipart: [{body: 'foo'}, {body: 'bar'}], + uri, + }, + () => { + assert.ok(statsStub.requestStarting.calledOnceWithExactly()); + assert.ok(statsStub.requestFinished.calledOnceWithExactly()); + scope.done(); + done(); + } + ); + }); + + it.skip('should track stats, multipart mode, failure', done => { + const scope = mockError(); + teenyRequest( + { + method: 'POST', + headers: {}, + multipart: [{body: 'foo'}, {body: 'bar'}], + uri, + }, + err => { + assert.ok(err); + assert.ok(statsStub.requestStarting.calledOnceWithExactly()); + assert.ok(statsStub.requestFinished.calledOnceWithExactly()); + scope.done(); + done(); + } + ); + }); });