Skip to content

Commit

Permalink
fix(mojaloop/#3750): add timer for party lookup in cache (#471)
Browse files Browse the repository at this point in the history
* fix: add timer for party lookup in cache

* lock

* audit
  • Loading branch information
kleyow committed Mar 13, 2024
1 parent b494bb6 commit bfd076f
Show file tree
Hide file tree
Showing 55 changed files with 822 additions and 701 deletions.
4 changes: 3 additions & 1 deletion audit-ci.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
// Issue with PostCSS library (https://github.com/advisories/GHSA-7fh5-64p2-3v2j)
"GHSA-7fh5-64p2-3v2j",
// SSRF attacks against npm IP (https://github.com/advisories/GHSA-78xj-cgh5-2h22)
"GHSA-78xj-cgh5-2h22"
"GHSA-78xj-cgh5-2h22",
// https://github.com/advisories/GHSA-rm97-x556-q36h
"GHSA-rm97-x556-q36h",
]
}
3 changes: 3 additions & 0 deletions modules/api-svc/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,6 @@ PM4ML_ENABLED=false
# Maximum payload limits
FSPIOP_API_SERVER_MAX_REQUEST_BYTES=209715200
BACKEND_API_SERVER_MAX_REQUEST_BYTES=209715200

# How much time to wait for cache to unsubscribe from a channel
UNSUBSCRIBE_TIMEOUT_MS=5000
16 changes: 8 additions & 8 deletions modules/api-svc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@
"@mojaloop/central-services-error-handling": "^12.0.7",
"@mojaloop/central-services-logger": "^11.2.2",
"@mojaloop/central-services-metrics": "^12.0.8",
"@mojaloop/central-services-shared": "18.2.0",
"@mojaloop/central-services-shared": "18.3.0",
"@mojaloop/event-sdk": "^14.0.0",
"@mojaloop/sdk-scheme-adapter-private-shared-lib": "workspace:^",
"@mojaloop/sdk-standard-components": "v17.4.0",
"@mojaloop/sdk-standard-components": "v18.0.0",
"ajv": "8.12.0",
"axios": "^1.6.7",
"co-body": "^6.1.0",
"dotenv": "^16.4.5",
"env-var": "^7.4.1",
"express": "^4.18.2",
"express": "^4.18.3",
"fast-json-patch": "^3.1.1",
"fast-safe-stringify": "^2.1.1",
"javascript-state-machine": "^3.1.0",
Expand All @@ -96,21 +96,21 @@
"ws": "^8.16.0"
},
"devDependencies": {
"@babel/core": "^7.23.9",
"@babel/preset-env": "^7.23.9",
"@babel/core": "^7.24.0",
"@babel/preset-env": "^7.24.0",
"@redocly/openapi-cli": "^1.0.0-beta.94",
"@types/jest": "^29.5.12",
"babel-jest": "^29.7.0",
"eslint": "^8.56.0",
"eslint": "^8.57.0",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-plugin-import": "^2.29.1",
"eslint-plugin-jest": "^27.9.0",
"jest": "^29.7.0",
"jest-junit": "^16.0.0",
"nock": "^13.5.3",
"nock": "^13.5.4",
"npm-check-updates": "^16.7.10",
"openapi-response-validator": "^12.1.3",
"openapi-typescript": "^6.7.4",
"openapi-typescript": "^6.7.5",
"redis-mock": "^0.56.3",
"replace": "^1.2.2",
"standard-version": "^9.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ module.exports.handleSDKOutboundBulkAcceptPartyInfoRequestedDmEvt = async (
currentState: BulkTransactionState.WAITING_FOR_PARTY_ACCEPTANCE,
});
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptPartyInfoRequested');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptPartyInfoRequested');
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ module.exports.handleSDKOutboundBulkAcceptQuoteRequestedDmEvt = async (
currentState: BulkTransactionState.WAITING_FOR_QUOTE_ACCEPTANCE,
});
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptQuoteRequested');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptQuoteRequested');
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ module.exports.handleSDKOutboundBulkResponsePreparedDmEvt = async (
});
await options.producer.sendDomainEvent(sdkOutboundBulkResponseSentDmEvt);
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleSDKOutboundBulkResponsePreparedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleSDKOutboundBulkResponsePreparedDmEvt');
}
};
12 changes: 6 additions & 6 deletions modules/api-svc/src/BackendEventHandler/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ class BackendEventHandler {

async start() {
const config = this._conf;
this._logger.isInfoEnabled() && this._logger.info('start');
this._logger.isInfoEnabled && this._logger.info('start');

this._consumer = new KafkaDomainEventConsumer(this._messageHandler.bind(this), config.backendEventHandler.domainEventConsumer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);

this._producer = new KafkaDomainEventProducer(config.backendEventHandler.domainEventProducer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
await this._producer.init();

// Create options for handlers
Expand All @@ -79,7 +79,7 @@ class BackendEventHandler {
}

async stop() {
this._logger.isInfoEnabled() && this._logger.info('stop');
this._logger.isInfoEnabled && this._logger.info('stop');
await Promise.all([
this._consumer?.destroy(),
this._producer?.destroy(),
Expand All @@ -88,7 +88,7 @@ class BackendEventHandler {
}

async _messageHandler(message) {
this._logger.isInfoEnabled() && this._logger.info(`Got domain event message: ${message.getName()}`);
this._logger.isInfoEnabled && this._logger.info(`Got domain event message: ${message.getName()}`);
// TODO: Handle errors validation here
switch (message.getName()) {
case SDKOutboundBulkAcceptPartyInfoRequestedDmEvt.name: {
Expand All @@ -104,7 +104,7 @@ class BackendEventHandler {
break;
}
default: {
this._logger.isDebugEnabled() && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
this._logger.isDebugEnabled && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
return;
}
}
Expand Down
14 changes: 7 additions & 7 deletions modules/api-svc/src/ControlAgent/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,22 @@ class Client extends ws {

async send(msg) {
const data = typeof msg === 'string' ? msg : serialise(msg);
this._logger.isDebugEnabled() && this._logger.push({ data }).debug('Sending message');
this._logger.isDebugEnabled && this._logger.push({ data }).debug('Sending message');
return new Promise((resolve) => super.send.call(this, data, resolve));
}

// Receive a single message
async receive() {
return new Promise((resolve) => this.once('message', (data) => {
const msg = deserialise(data);
this._logger.isDebugEnabled() && this._logger.push({ msg }).debug('Received');
this._logger.isDebugEnabled && this._logger.push({ msg }).debug('Received');
resolve(msg);
}));
}

// Close connection
async stop() {
this._logger.isInfoEnabled() && this._logger.info('Control client shutting down...');
this._logger.isInfoEnabled && this._logger.info('Control client shutting down...');
this.close();
}

Expand All @@ -171,24 +171,24 @@ class Client extends ws {
try {
msg = deserialise(data);
} catch (err) {
this._logger.isErrorEnabled() && this._logger.push({ data }).console.error();('Couldn\'t parse received message');
this._logger.isErrorEnabled && this._logger.push({ data }).console.error();('Couldn\'t parse received message');
this.send(build.ERROR.NOTIFY.JSON_PARSE_ERROR());
}
this._logger.isDebugEnabled() && this._logger.push({ msg }).debug('Handling received message');
this._logger.isDebugEnabled && this._logger.push({ msg }).debug('Handling received message');
switch (msg.msg) {
case MESSAGE.CONFIGURATION:
switch (msg.verb) {
case VERB.NOTIFY: {
const dup = JSON.parse(JSON.stringify(this._appConfig)); // fast-json-patch explicitly mutates
_.merge(dup, msg.data);
this._logger.isDebugEnabled() && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this._logger.isDebugEnabled && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
case VERB.PATCH: {
const dup = JSON.parse(JSON.stringify(this._appConfig)); // fast-json-patch explicitly mutates
jsonPatch.applyPatch(dup, msg.data);
this._logger.isDebugEnabled() && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this._logger.isDebugEnabled && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
Expand Down
24 changes: 12 additions & 12 deletions modules/api-svc/src/ControlServer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ class Client extends ws {

async send(msg) {
const data = typeof msg === 'string' ? msg : serialise(msg);
this._logger.isDebugEnabled() && this._logger.push({ data }).debug('Sending message');
this._logger.isDebugEnabled && this._logger.push({ data }).debug('Sending message');
return new Promise((resolve) => super.send.call(this, data, resolve));
}

// Receive a single message
async receive() {
return new Promise((resolve) => this.once('message', (data) => {
const msg = deserialise(data);
this._logger.isDebugEnabled() && this._logger.push({ msg }).debug('Received');
this._logger.isDebugEnabled && this._logger.push({ msg }).debug('Received');
resolve(msg);
}));
}
Expand Down Expand Up @@ -186,7 +186,7 @@ class Server extends ws.Server {
this._clientData = new Map();

this.on('error', err => {
this._logger.isErrorEnabled() && this._logger.push({ err })
this._logger.isErrorEnabled && this._logger.push({ err })
.error('Unhandled websocket error occurred. Shutting down.');
process.exit(1);
});
Expand All @@ -197,18 +197,18 @@ class Server extends ws.Server {
ip: getWsIp(req),
remoteAddress: req.socket.remoteAddress,
});
logger.isInfoEnabled() && logger.info('Websocket connection received');
logger.isInfoEnabled && logger.info('Websocket connection received');
this._clientData.set(socket, { ip: req.connection.remoteAddress, logger });

socket.on('close', (code, reason) => {
logger.isInfoEnabled() && logger.push({ code, reason }).info('Websocket connection closed');
logger.isInfoEnabled && logger.push({ code, reason }).info('Websocket connection closed');
this._clientData.delete(socket);
});

socket.on('message', this._handle(socket, logger));
});

this._logger.isInfoEnabled() && this._logger.push(this.address()).info('running on');
this._logger.isInfoEnabled && this._logger.push(this.address()).info('running on');
}

// Close the server then wait for all the client sockets to close
Expand All @@ -218,14 +218,14 @@ class Server extends ws.Server {
client.terminate();
}
await closing;
this._logger.isInfoEnabled() && this._logger.info('Control server shutdown complete');
this._logger.isInfoEnabled && this._logger.info('Control server shutdown complete');
}


async notifyClientsOfCurrentConfig() {
const updateConfMsg = build.CONFIGURATION.NOTIFY(this._appConfig);
const logError = (socket, message) => (err) =>
this._logger.isErrorEnabled() && this._logger
this._logger.isErrorEnabled && this._logger
.push({ message, ip: this._clientData.get(socket).ip, err })
.error('Error sending reconfigure notification to client');
const sendToAllClients = (msg) => Promise.all(
Expand All @@ -244,10 +244,10 @@ class Server extends ws.Server {
try {
msg = deserialise(data);
} catch (err) {
logger.isErrorEnabled() && logger.push({ data }).error('Couldn\'t parse received message');
logger.isErrorEnabled && logger.push({ data }).error('Couldn\'t parse received message');
client.send(build.ERROR.NOTIFY.JSON_PARSE_ERROR());
}
logger.isDebugEnabled() && logger.push({ msg }).debug('Handling received message');
logger.isDebugEnabled && logger.push({ msg }).debug('Handling received message');
switch (msg.msg) {
case MESSAGE.CONFIGURATION:
switch (msg.verb) {
Expand All @@ -257,7 +257,7 @@ class Server extends ws.Server {
case VERB.NOTIFY: {
const dup = structuredClone(this._appConfig); // fast-json-patch explicitly mutates
_.merge(dup, msg.data);
this._logger.isDebugEnabled() && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this._logger.isDebugEnabled && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
Expand All @@ -266,7 +266,7 @@ class Server extends ws.Server {
// client library?
const dup = structuredClone(this._appConfig); // fast-json-patch explicitly mutates
jsonPatch.applyPatch(dup, msg.data);
logger.isDebugEnabled() && logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
logger.isDebugEnabled && logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ module.exports.handleBulkQuotesRequestedDmEvt = async (
await options.producer.sendDomainEvent(bulkQuotesCallbackReceivedDmEvt);
}
catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleBulkQuotesRequestedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleBulkQuotesRequestedDmEvt');
const bulkQuotesCallbackReceivedDmEvt = new BulkQuotesCallbackReceivedDmEvt({
bulkId: event.getKey(),
content: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ module.exports.handleBulkTransfersRequestedDmEvt = async (

await options.producer.sendDomainEvent(bulkTransfersCallbackReceivedDmEvt);
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleBulkTransfersRequestedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleBulkTransfersRequestedDmEvt');
const bulkTransfersCallbackReceivedDmEvt = new BulkTransfersCallbackReceivedDmEvt({
bulkId: event.getKey(),
content: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module.exports.handlePartyInfoRequestedDmEvt = async (
});
await options.producer.sendDomainEvent(partyInfoCallbackReceivedDmEvt);
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handlePartyInfoRequestedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handlePartyInfoRequestedDmEvt');
const { code, message } = Errors.MojaloopApiErrorCodes.SERVER_TIMED_OUT;
const partyInfoCallbackReceivedDmEvt = new PartyInfoCallbackReceivedDmEvt({
bulkId: event.getKey(),
Expand Down
12 changes: 6 additions & 6 deletions modules/api-svc/src/FSPIOPEventHandler/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ class FSPIOPEventHandler {

async start() {
const config = this._conf;
this._logger.isInfoEnabled() && this._logger.info('start');
this._logger.isInfoEnabled && this._logger.info('start');

this._consumer = new KafkaDomainEventConsumer(this._messageHandler.bind(this), config.fspiopEventHandler.domainEventConsumer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);

this._producer = new KafkaDomainEventProducer(config.fspiopEventHandler.domainEventProducer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
await this._producer.init();

// Create options for handlers
Expand All @@ -75,15 +75,15 @@ class FSPIOPEventHandler {
}

async stop() {
this._logger.isInfoEnabled() && this._logger.info('stop');
this._logger.isInfoEnabled && this._logger.info('stop');
await Promise.all([
this._consumer?.destroy(),
this._producer?.destroy(),
]);
}

async _messageHandler(message) {
this._logger.isInfoEnabled() && this._logger.info(`Got domain event message: ${message.getName()}`);
this._logger.isInfoEnabled && this._logger.info(`Got domain event message: ${message.getName()}`);
// TODO: Handle errors validation here
switch (message.getName()) {
case PartyInfoRequestedDmEvt.name: {
Expand All @@ -99,7 +99,7 @@ class FSPIOPEventHandler {
break;
}
default: {
this._logger.isDebugEnabled() && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
this._logger.isDebugEnabled && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
return;
}
}
Expand Down

0 comments on commit bfd076f

Please sign in to comment.