Skip to content

Commit

Permalink
Delta support (#126)
Browse files Browse the repository at this point in the history
* initial delta support

* added tests for delta feature

* updated documentation and default

* fixed type in config

* fixed eslint errors

* switching travis to modern node

* Additional test for test coverage to pass.

* Additional validateConfig() test.

* fixed mislabeled variable

* fixed sync issue caused by validation of deleted instances
  • Loading branch information
awolden committed Jan 4, 2018
1 parent 4cd9d01 commit 99849b2
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 38 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Expand Up @@ -6,10 +6,9 @@ services:
- docker

node_js:
- "0.10"
- "0.12"
- "4"
- "6"
- "8"

script:
- npm run test && npm run integration
Expand Down
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -193,6 +193,7 @@ option | default value | description
---- | --- | ---
`requestMiddleware` | noop | Custom middleware function to modify the outgoing [request](https://www.npmjs.com/package/request) to eureka
`logger` | console logging | logger implementation for the client to use
`shouldUseDelta` | false | Experimental mode to fetch deltas from eureka instead of full registry on update
`eureka.maxRetries` | `3` | Number of times to retry all requests to eureka
`eureka.requestRetryDelay` | `500` | milliseconds to wait between retries. This will be multiplied by the # of failed retries.
`eureka.heartbeatInterval` | `30000` | milliseconds to wait between heartbeats
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "eureka-js-client",
"version": "4.3.0",
"version": "4.4.0",
"description": "A JavaScript implementation the Netflix OSS service registry, Eureka.",
"main": "lib/index.js",
"scripts": {
Expand Down
129 changes: 108 additions & 21 deletions src/EurekaClient.js
@@ -1,7 +1,8 @@
import request from 'request';
import fs from 'fs';
import yaml from 'js-yaml';
import merge from 'lodash/merge';
import { merge, findIndex } from 'lodash';
import { normalizeDelta, findInstance } from './deltaUtils';
import path from 'path';
import { series, waterfall } from 'async';
import { EventEmitter } from 'events';
Expand Down Expand Up @@ -69,6 +70,8 @@ export default class Eureka extends EventEmitter {

this.requestMiddleware = this.config.requestMiddleware;

this.hasFullRegistry = false;

if (this.amazonDataCenter) {
this.metadataClient = new AwsMetadata({
logger: this.logger,
Expand Down Expand Up @@ -328,32 +331,72 @@ export default class Eureka extends EventEmitter {
}

/*
Retrieves all applications registered with the Eureka server
Orchestrates fetching registry
*/
fetchRegistry(callback = noop) {
if (this.config.shouldUseDelta && this.hasFullRegistry) {
this.fetchDelta(callback);
} else {
this.fetchFullRegistry(callback);
}
}

/*
Retrieves all applications registered with the Eureka server
*/
fetchFullRegistry(callback = noop) {
this.eurekaRequest({
uri: '',
headers: {
Accept: 'application/json',
},
}, (error, response, body) => {
if (!error && response.statusCode === 200) {
this.logger.debug('retrieved registry successfully');
this.logger.debug('retrieved full registry successfully');
try {
this.transformRegistry(JSON.parse(body));
} catch (ex) {
return callback(ex);
}
this.emit('registryUpdated');
this.hasFullRegistry = true;
return callback(null);
} else if (error) {
this.logger.warn('Error fetching registry', error);
return callback(error);
}
callback(new Error('Unable to retrieve registry from Eureka server'));
callback(new Error('Unable to retrieve full registry from Eureka server'));
});
}

/*
Retrieves all applications registered with the Eureka server
*/
fetchDelta(callback = noop) {
this.eurekaRequest({
uri: 'delta',
headers: {
Accept: 'application/json',
},
}, (error, response, body) => {
if (!error && response.statusCode === 200) {
this.logger.debug('retrieved delta successfully');
let applications;
try {
const jsonBody = JSON.parse(body);
applications = jsonBody.applications.application;
this.handleDelta(this.cache, applications);
return callback(null);
} catch (ex) {
return callback(ex);
}
} else if (error) {
this.logger.warn('Error fetching delta registry', error);
return callback(error);
}
callback(new Error('Unable to retrieve delta registry from Eureka server'));
});
}
/*
Transforms the given registry and caches the registry locally
*/
Expand Down Expand Up @@ -382,24 +425,11 @@ export default class Eureka extends EventEmitter {
*/
transformApp(app, cache) {
if (app.instance.length) {
const instances = app.instance.filter((instance) => (this.validateInstance(instance)));
cache.app[app.name.toUpperCase()] = instances;
instances.forEach((inst) => {
const vipAddresses = this.splitVipAddress(inst.vipAddress);
vipAddresses.forEach((vipAddress) => {
if (!cache.vip[vipAddress]) {
cache.vip[vipAddress] = [];
}
cache.vip[vipAddress].push(inst);
});
});
app.instance
.filter(this.validateInstance.bind(this))
.forEach((inst) => this.addInstance(cache, inst));
} else if (this.validateInstance(app.instance)) {
const instances = [app.instance];
const vipAddresses = this.splitVipAddress(app.instance.vipAddress);
vipAddresses.forEach((vipAddress) => {
cache.vip[vipAddress] = instances;
});
cache.app[app.name.toUpperCase()] = instances;
this.addInstance(cache, app.instance);
}
}

Expand All @@ -421,6 +451,63 @@ export default class Eureka extends EventEmitter {
return vipAddress.split(',');
}

handleDelta(cache, appDelta) {
const delta = normalizeDelta(appDelta);
delta.forEach((app) => {
app.instance.forEach((instance) => {
switch (instance.actionType) {
case 'ADDED': this.addInstance(cache, instance); break;
case 'MODIFIED': this.modifyInstance(cache, instance); break;
case 'DELETED': this.deleteInstance(cache, instance); break;
default: this.logger.warn('Unknown delta actionType', instance.actionType); break;
}
});
});
}

addInstance(cache, instance) {
if (!this.validateInstance(instance)) return;
const vipAddresses = this.splitVipAddress(instance.vipAddress);
const appName = instance.app.toUpperCase();
vipAddresses.forEach((vipAddress) => {
const alreadyContains = findIndex(cache.vip[vipAddress], findInstance(instance)) > -1;
if (alreadyContains) return;
if (!cache.vip[vipAddress]) {
cache.vip[vipAddress] = [];
}
cache.vip[vipAddress].push(instance);
});
if (!cache.app[appName]) cache.app[appName] = [];
const alreadyContains = findIndex(cache.app[appName], findInstance(instance)) > -1;
if (alreadyContains) return;
cache.app[appName].push(instance);
}

modifyInstance(cache, instance) {
if (!this.validateInstance(instance)) return;
const vipAddresses = this.splitVipAddress(instance.vipAddress);
const appName = instance.app.toUpperCase();
vipAddresses.forEach((vipAddress) => {
const index = findIndex(cache.vip[vipAddress], findInstance(instance));
if (index > -1) cache.vip[vipAddress].splice(index, 1, instance);
else this.addInstance(cache, instance);
});
const index = findIndex(cache.app[appName], findInstance(instance));
if (index > -1) cache.app[appName].splice(cache.vip[instance.vipAddress], 1, instance);
else this.addInstance(cache, instance);
}

deleteInstance(cache, instance) {
const vipAddresses = this.splitVipAddress(instance.vipAddress);
const appName = instance.app.toUpperCase();
vipAddresses.forEach((vipAddress) => {
const index = findIndex(cache.vip[vipAddress], findInstance(instance));
if (index > -1) cache.vip[vipAddress].splice(index, 1);
});
const index = findIndex(cache.app[appName], findInstance(instance));
if (index > -1) cache.app[appName].splice(cache.vip[instance.vipAddress], 1);
}

/*
Fetches the metadata using the built-in client and updates the instance
configuration with the hostname and IP address. If the value of the config
Expand Down
1 change: 1 addition & 0 deletions src/defaultConfig.js
@@ -1,6 +1,7 @@
// Default configuration values:
export default {
requestMiddleware: (request, done) => done(request),
shouldUseDelta: false,
eureka: {
heartbeatInterval: 30000,
registryFetchInterval: 30000,
Expand Down
17 changes: 17 additions & 0 deletions src/deltaUtils.js
@@ -0,0 +1,17 @@
/*
General utilities for handling processing of delta changes from eureka.
*/
export function arrayOrObj(mysteryValue) {
return Array.isArray(mysteryValue) ? mysteryValue : [mysteryValue];
}

export function findInstance(a) {
return b => a.hostName === b.hostName && a.port.$ === b.port.$;
}

export function normalizeDelta(appDelta) {
return arrayOrObj(appDelta).map((app) => {
app.instance = arrayOrObj(app.instance);
return app;
});
}

0 comments on commit 99849b2

Please sign in to comment.