Skip to content

Commit

Permalink
feat: add option for user-provided EventEmitter to control buckets (#809
Browse files Browse the repository at this point in the history
)

The motivation for this PR. I added an option for a user-provided EventEmitter that would handle the rotating of the buckets. The idea here is that you could do something like this in your app
```
const bucketController = new EventEmitter();
setInterval(() => {
   bucketController.emit('rotate')
}, 1000)
```
And then pass `bucketController` into opossum. Opossum would listen for that event and then rotate the buckets.


---------

Co-authored-by: Gautam Jethwani <gjethwani@atlassian.com>
  • Loading branch information
gjethwani and Gautam Jethwani committed Sep 13, 2023
1 parent 2d915be commit 38013d4
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 9 deletions.
12 changes: 12 additions & 0 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ Please use options.errorThresholdPercentage`;
* @param {AbortController} options.abortController this allows Opossum to
* signal upon timeout and properly abort your on going requests instead of
* leaving it in the background
* @param {boolean} options.enableSnapshots whether to enable the rolling
* stats snapshots that opossum emits at the bucketInterval. Disable this
* as an optimization if you don't listen to the 'snapshot' event to reduce
* the number of timers opossum initiates.
* @param {EventEmitter} options.rotateBucketController if you have multiple
* breakers in your app, the number of timers across breakers can get costly.
* This option allows you to provide an EventEmitter that rotates the buckets
* so you can have one global timer in your app. Make sure that you are
* emitting a 'rotate' event from this EventEmitter
*
*
* @fires CircuitBreaker#halfOpen
Expand Down Expand Up @@ -159,6 +168,7 @@ class CircuitBreaker extends EventEmitter {
this.options.cacheGetKey = options.cacheGetKey ??
((...args) => JSON.stringify(args));
this.options.enableSnapshots = options.enableSnapshots !== false;
this.options.rotateBucketController = options.rotateBucketController;

// Set default cache transport if not provided
if (this.options.cache) {
Expand Down Expand Up @@ -789,6 +799,7 @@ class CircuitBreaker extends EventEmitter {
*/
enable () {
this[ENABLED] = true;
this.status.startListeneningForRotateEvent();
}

/**
Expand All @@ -798,6 +809,7 @@ class CircuitBreaker extends EventEmitter {
*/
disable () {
this[ENABLED] = false;
this.status.removeRotateBucketControllerListener();
}
}

Expand Down
47 changes: 40 additions & 7 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const TIMEOUT = Symbol('timeout');
const PERCENTILES = Symbol('percentiles');
const BUCKET_INTERVAL = Symbol('bucket-interval');
const SNAPSHOT_INTERVAL = Symbol('snapshot-interval');
const ROTATE_EVENT_NAME = Symbol('rotate-event-name');

const EventEmitter = require('events').EventEmitter;

Expand Down Expand Up @@ -56,6 +57,7 @@ class Status extends EventEmitter {
this[TIMEOUT] = options.rollingCountTimeout || 10000;
this[WINDOW] = new Array(this[BUCKETS]);
this[PERCENTILES] = [0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995, 1];
this[ROTATE_EVENT_NAME] = 'rotate';

// Default this value to true
this.rollingPercentilesEnabled =
Expand All @@ -64,17 +66,25 @@ class Status extends EventEmitter {
// Default this value to true
this.enableSnapshots = options.enableSnapshots !== false;

// can be undefined
this.rotateBucketController = options.rotateBucketController;
this.rotateBucket = nextBucket(this[WINDOW]);

// prime the window with buckets
for (let i = 0; i < this[BUCKETS]; i++) this[WINDOW][i] = bucket();

// rotate the buckets periodically
const bucketInterval = Math.floor(this[TIMEOUT] / this[BUCKETS]);
this[BUCKET_INTERVAL] = setInterval(nextBucket(this[WINDOW]),
bucketInterval);

// No unref() in the browser
if (typeof this[BUCKET_INTERVAL].unref === 'function') {
this[BUCKET_INTERVAL].unref();
if (this.rotateBucketController) {
// rotate the buckets based on an optional EventEmitter
this.startListeneningForRotateEvent();
} else {
// or rotate the buckets periodically
this[BUCKET_INTERVAL] = setInterval(this.rotateBucket, bucketInterval);
// No unref() in the browser
if (typeof this[BUCKET_INTERVAL].unref === 'function') {
this[BUCKET_INTERVAL].unref();
}
}

/**
Expand Down Expand Up @@ -173,11 +183,34 @@ class Status extends EventEmitter {

shutdown () {
this.removeAllListeners();
clearInterval(this[BUCKET_INTERVAL]);
// interval is not set if rotateBucketController is provided
if (this.rotateBucketController === undefined) {
clearInterval(this[BUCKET_INTERVAL]);
} else {
this.removeRotateBucketControllerListener();
}
if (this.enableSnapshots) {
clearInterval(this[SNAPSHOT_INTERVAL]);
}
}

removeRotateBucketControllerListener () {
if (this.rotateBucketController) {
this.rotateBucketController.removeListener(this[ROTATE_EVENT_NAME],
this.rotateBucket);
}
}

startListeneningForRotateEvent () {
if (
this.rotateBucketController &&
this.rotateBucketController.listenerCount(this[ROTATE_EVENT_NAME],
this.rotateBucket) === 0
) {
this.rotateBucketController.on(this[ROTATE_EVENT_NAME],
this.rotateBucket);
}
}
}

const nextBucket = window => _ => {
Expand Down
2 changes: 1 addition & 1 deletion test/browser/generate-index.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
echo $PWD

cd test
file_list=$(ls -1 | grep .js)
file_list=$(ls -1 | grep .js | grep -v rolling-event-emitter)
cd ..
requires=""

Expand Down
2 changes: 1 addition & 1 deletion test/enable-disable-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ test('When disabled the circuit should always be closed', t => {
breaker.fire(-1)
.catch(e => t.equals(e, 'Error: -1 is < 0'))
.then(() => {
t.ok(breaker.opened, 'should be closed');
t.ok(breaker.opened, 'should be open');
})
.then(_ => breaker.shutdown())
.then(t.end);
Expand Down
81 changes: 81 additions & 0 deletions test/rolling-event-emitter-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
'use strict';

const test = require('tape');
const CircuitBreaker = require('../');
const { passFail } = require('./common');
const EventEmitter = require('events').EventEmitter;

test('When disabled, the event emitter listener should be removed', t => {
t.plan(2);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically');
breaker.disable();
t.equals(emitter.listeners('rotate').length, 0, 'listener removed when breaker disabled');
breaker.shutdown();
t.end();
});

test('Event listener should be removed only for the breaker that is disabled', t => {
t.plan(2);
const emitter = new EventEmitter();
const breakerToBeDisabled = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
const breakerNotToBeDisabled = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 2, '1 listener attached for each breaker');
breakerToBeDisabled.disable();
t.equals(emitter.listeners('rotate').length, 1, '1 listener should be disabled and 1 should remain');
breakerToBeDisabled.shutdown();
breakerNotToBeDisabled.shutdown();
t.end();
});

test('Event listener should be re-added when circuit is re-enabled', t => {
t.plan(3);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically');
breaker.disable();
t.equals(emitter.listeners('rotate').length, 0, 'listener removed when breaker disabled');
breaker.enable();
t.equals(emitter.listeners('rotate').length, 1, 'listener re-attached when breaker re-enabled');
breaker.shutdown();
t.end();
});

test('Listener should not be attached with a call to enable if there is already a listener', t => {
t.plan(2);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically');
breaker.enable();
t.equals(emitter.listeners('rotate').length, 1, 'listener should not be added again');
breaker.shutdown();
t.end();
});

test('Listener should not be attached with a call to enable if there is already a listener and there is another breaker in the mix', t => {
t.plan(2);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
const anotherBreaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 2, 'listener attached automatically');
breaker.enable();
t.equals(emitter.listeners('rotate').length, 2, 'listener should not be added again');
breaker.shutdown();
anotherBreaker.shutdown();
t.end();
});
56 changes: 56 additions & 0 deletions test/status-test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const { EventEmitter } = require('events');
const test = require('tape');
const CircuitBreaker = require('../');
const Status = require('../lib/status.js');
Expand Down Expand Up @@ -194,3 +195,58 @@ test('CircuitBreaker status - enableSnapshots is false in Status when set to fal
breaker.shutdown();
t.end();
});

test('CircuitBreaker status - breaker stats should not reset when rotateBucketController provided and no event emitted', t => {
t.plan(1);

const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter,
rollingCountTimeout: 10
});

breaker.fire(-1)
.catch(() => {
setTimeout(() => {
t.equal(breaker.status.stats.failures, 1, 'failures do not reset because no rotate event is emitted');
breaker.shutdown();
t.end();
}, 100);
});
});

test('CircuitBreaker status - breaker stats should rotate when rotateBucketController provided and "rotate" event emitted', t => {
t.plan(1);

const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter,
rollingCountBuckets: 1,
rollingCountTimeout: 10000000
});

breaker.fire(-1)
.catch(() => {
emitter.emit('rotate');
t.equal(breaker.status.stats.failures, 0, 'failures reset buckets are rotated by EventEmitter');
breaker.shutdown();
t.end();
});
});

test('CircuitBreaker status - breaker stats should reset when rotateBucketController not provided', t => {
t.plan(1);

const breaker = new CircuitBreaker(passFail, {
rollingCountTimeout: 10
});

breaker.fire(-1)
.catch(() => {
setTimeout(() => {
t.equal(breaker.status.stats.failures, 0, 'failures reset because no event emitter is provided and rollingCountTimeout set to 10ms');
breaker.shutdown();
t.end();
}, 100);
});
});

0 comments on commit 38013d4

Please sign in to comment.