Skip to content

Commit

Permalink
Restart on channel close (#1480)
Browse files Browse the repository at this point in the history
* use log.criticalErrorAndExit to exist the process when channel closed

* exit ferryman on channel close or error
---------

Co-authored-by: James Weber <weberjm@gmail.com>
Co-authored-by: James <james.weber@cloudecosystem.org>
  • Loading branch information
3 people committed Sep 7, 2023
1 parent bbfabda commit e5aa3a7
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 123 deletions.
118 changes: 30 additions & 88 deletions lib/ferryman/lib/amqp.js
Expand Up @@ -22,108 +22,49 @@ class Amqp {

async connect(uri) {
this.consumerTag = null;
this.connectRetryCount = 0;
const delayInSeconds = (this.settings.AMQP_CONNECT_RETRY_DELAY / 1000);
// this.connectRetryCount = 0;
// const delayInSeconds = (this.settings.AMQP_CONNECT_RETRY_DELAY / 1000);

try {
this.amqp = await amqplib.connect(uri);
this.isConnected = true;

if (process.env.NODE_ENV !== 'test') {
this.amqp.on('error',(e)=>{
log.error('Error while creating connection');
log.error(e);
this.isConnected = false;
this.amqp.on('error',(e)=> {
log.criticalErrorAndExit('RabbitMQ connection error', e);
});
this.amqp.on('close',(e)=>{
log.error('Error while creating connection - got closed');
log.error(e);
this.isConnected = false;
this.amqp.on('close',()=> {
log.criticalErrorAndExit('RabbitMQ connection closed');
});
}

log.debug('Connected to AMQP');
} catch (e) {
log.error('Catched error while creating connection');
log.error(e);
this.isConnected = false;
log.criticalErrorAndExit('Failed to create RabbitMQ connection', e);
}

try {
this.subscribeChannel = await this.amqp.createChannel();
this.subscribeChannel.on('error',(e)=> {
log.criticalErrorAndExit('Subscribe channel closed', e);
}).once('close', () => {
log.criticalErrorAndExit('Subscribe channel closed');
});

if (this.isConnected) {
log.debug('Connected to AMQP');

if (!this.hasSubscribeChannel) {
try {
this.subscribeChannel = await this.amqp.createChannel();
this.hasSubscribeChannel = true;
this.subscribeChannel.on('error',(e)=>{
log.error('Error while opening subscribe channel');
log.error(e);
this.hasSubscribeChannel = false;
});

log.debug('Opened subscribe channel');
} catch (e) {
log.error('Catched error while opening subscribe channel');
log.error(e);
this.hasSubscribeChannel = false;
}

if (!this.hasSubscribeChannel) {
this.connectRetryCount += 1;
if (
this.connectRetryCount <= this.settings.AMQP_CONNECT_RETRY_LIMIT
) {
setTimeout((uri)=>{ this.connect(uri); }, this.settings.AMQP_CONNECT_RETRY_DELAY, uri);
log.info(`Not connected to subscribe channel retrying in ${delayInSeconds}s`);
} else {
log.error('Connect retry limit reached giving up');
}
}
}

if (!this.hasPublishChannel) {
try {
this.publishChannel = await this.amqp.createConfirmChannel();
this.hasPublishChannel = true;

this.publishChannel.on('error', (e)=>{
log.error('Error while opening publish channel');
log.error(e);
this.hasPublishChannel = false;
});

log.debug('Opened publish channel');
log.debug('Opened subscribe channel');
} catch (e) {
log.criticalErrorAndExit('Failed to create subscribe channel', e);
}

} catch (e) {
log.error('Catched error while opening publish channel');
log.error(e);
this.hasPublishChannel = false;
}
}
try {
this.publishChannel = await this.amqp.createConfirmChannel();
this.publishChannel.on('error', (e)=> {
log.criticalErrorAndExit('Publish channel error', e);
}).once('close', () => {
log.criticalErrorAndExit('Publish channel closed');
});

if (!this.hasPublishChannel) {
log.info(`Not connected to publish channel retrying in ${delayInSeconds}s`);
this.connectRetryCount += 1;
if (
this.connectRetryCount <= this.settings.AMQP_CONNECT_RETRY_LIMIT
) {
setTimeout((uri)=>{ this.connect(uri); }, this.settings.AMQP_CONNECT_RETRY_DELAY, uri);
log.info(`Not connected to publish channel retrying in ${delayInSeconds}s`);
} else {
log.error('Connect retry limit reached giving up');
}
}
} else {
this.connectRetryCount += 1;
if (
this.connectRetryCount <= this.settings.AMQP_CONNECT_RETRY_LIMIT
) {
setTimeout((uri)=>{ this.connect(uri); }, this.settings.AMQP_CONNECT_RETRY_DELAY, uri);
log.info(`Not connected retrying in ${delayInSeconds}s`);
} else {
log.error('Connect retry limit reached giving up');
}
log.debug('Opened publish channel');
} catch (e) {
log.criticalErrorAndExit('Failed to create publish channel', e);
}
}

Expand Down Expand Up @@ -190,6 +131,7 @@ class Amqp {
log.trace('Message received: %j', message);

if (message === null) {
// means RabbitMQ channel is closed, no more messages will come
log.warn('NULL message received');
return;
}
Expand Down
58 changes: 25 additions & 33 deletions lib/ferryman/lib/ferryman.js
Expand Up @@ -93,7 +93,7 @@ class Ferryman {
log.info(`Component REST API listening on port 3001`);
}

async getSnapShot(flowId, stepId, token, flowExecId) {
async getSnapShot(flowId, stepId, token, logger) {
try {
const getOptions = {
uri: `${this.settings.SNAPSHOTS_SERVICE_BASE_URL}/snapshots/flows/${flowId}/steps/${stepId}`,
Expand All @@ -116,13 +116,11 @@ class Ferryman {
return null;
}

log.warn(`Failed to fetch the snapshot ${flowId}:${stepId}`);
logger.error({ statusCode: result.statusCode }, `Failed to fetch the snapshot`);

return null;
} catch (e) {
log.error(e);
log.error(`Error: Failed to fetch the snapshot ${flowId}:${stepId}`);

logger.error(e, `Failed to fetch snapshot`);
return null;
}
}
Expand Down Expand Up @@ -290,7 +288,7 @@ class Ferryman {
return this.amqpConnection.disconnect();
}

async fetchSecret(secretId, token) {
async fetchSecret(secretId, token, logger) {
try {
const options = {
method: 'GET',
Expand All @@ -315,10 +313,10 @@ class Ferryman {
}
return data.value;
}
log.error(`Could not fetch Secret: ${response.statusCode} - ${JSON.stringify(response.body)}`);
logger.error(`Could not fetch Secret: ${response.statusCode} - ${JSON.stringify(response.body)}`);
return {};
} catch (e) {
log.error(e);
logger.error(e);
return {};
}
}
Expand Down Expand Up @@ -649,7 +647,9 @@ class Ferryman {
log.warn('Could not decode jwt!');
}

if (!tokenData) {return false;}
if (!tokenData) {
return false;
}

if (this.nodeSettings && this.nodeSettings.applyTransform
&& (this.nodeSettings.applyTransform === 'before' || this.nodeSettings.applyTransform === 'both')) {
Expand Down Expand Up @@ -734,38 +734,33 @@ class Ferryman {
const { settings } = this;
const incomingMessageHeaders = this.readIncomingMessageHeaders(message);
const origPassthrough = _.cloneDeep(payload.passthrough) || {};
const loggerOptions = {
..._.pick(incomingMessageHeaders, ['threadId', 'messageId', 'parentMessageId']),
..._.pick(tokenData, ['flowExecId', 'flowId', 'function', 'stepId', 'tenant', 'userId'])
};
const logger = log.child(loggerOptions);

const { routingKey } = message.fields;

self.messagesCount += 1;

const timeStart = Date.now();
const { deliveryTag } = message.fields;

const logger = log.child({
threadId: incomingMessageHeaders.threadId || 'unknown',
messageId: incomingMessageHeaders.messageId || 'unknown',
routingKey: routingKey || 'unknown',
parentMessageId: incomingMessageHeaders.parentMessageId || 'unknown',
deliveryTag
});

logger.trace({ messagesCount: this.messagesCount }, 'processMessage received');

// Fetch secret if necessary
let secret = false;

if (tokenData && tokenData.secretId && tokenData.apiKey) {
secret = await this.fetchSecret(
tokenData.secretId,
tokenData.apiKey
tokenData.apiKey,
logger
);
}

const stepData = {}; // holdover necessary for compatibility, @todo: remove entirely

// eslint-disable-next-line max-len
const snapshot = await this.getSnapShot(tokenData.flowId, tokenData.stepId, tokenData.apiKey, tokenData.flowExecId) || {};
const snapshot = await this.getSnapShot(tokenData.flowId, tokenData.stepId, tokenData.apiKey, logger) || {};

if (secret) {
_.assign(cfg, secret);
Expand All @@ -774,7 +769,7 @@ class Ferryman {
// TODO: Determine whether setting value can/should be disregarded entirely
const action = this.function || settings.FUNCTION;

log.info('Trigger or action: %s', action);
logger.trace('Trigger or action: %s', action);

// this.flowId this.stepId

Expand Down Expand Up @@ -804,13 +799,13 @@ class Ferryman {
module = await this.componentReader.loadTriggerOrAction(action);
componentJson = this.componentReader.componentJson;
} catch (e) {
log.error(e);
logger.error(e);
outgoingMessageHeaders.end = new Date().getTime();
if (self.amqpConnection) {
self.amqpConnection.sendError(e, outgoingMessageHeaders, message);
self.amqpConnection.reject(message);
} else {
log.error('No amqpConnection!');
logger.error('No amqpConnection!');
}
return false;
}
Expand All @@ -822,10 +817,7 @@ class Ferryman {


const taskExec = new TaskExec({
loggerOptions: {
..._.pick(incomingMessageHeaders, ['threadId', 'messageId', 'parentMessageId']),
..._.pick(tokenData, ['flowExecId', 'flowId', 'function', 'stepId', 'tenant', 'userId'])
},
loggerOptions: loggerOptions,
variables: stepData.variables,
services: {
// apiClient: self.apiClient,
Expand Down Expand Up @@ -957,7 +949,7 @@ class Ferryman {


} else if (passedCfg.nodeSettings && passedCfg.nodeSettings.idLinking) {
log.warn('No DATAHUB_BASE_URL defined skipping ID-Linking functions in onData');
logger.warn('No DATAHUB_BASE_URL defined skipping ID-Linking functions in onData');
}

try {
Expand Down Expand Up @@ -990,10 +982,10 @@ class Ferryman {

if (response === false) {
// eslint-disable-next-line max-len
log.error(`DataHub update failed. RecordUid: ${savedMeta.recordUid} applicationUid: ${applicationUid} not found in entry: ${oihUid}`);
logger.error(`DataHub update failed. RecordUid: ${savedMeta.recordUid} applicationUid: ${applicationUid} not found in entry: ${oihUid}`);
} else if (response === null) {
// eslint-disable-next-line max-len
log.error(`DataHub update failed for recordUid: ${savedMeta.recordUid} applicationUid: ${applicationUid} oihUid: ${oihUid}`);
logger.error(`DataHub update failed for recordUid: ${savedMeta.recordUid} applicationUid: ${applicationUid} oihUid: ${oihUid}`);
}
}
}
Expand All @@ -1017,7 +1009,7 @@ class Ferryman {

return await self.amqpConnection.sendBackChannel(data, headers, self.throttles.data);
} catch (err) {
console.error(err);
logger.error(err);
return onError(err);
}
};
Expand Down
2 changes: 1 addition & 1 deletion lib/ferryman/package.json
@@ -1,7 +1,7 @@
{
"name": "@openintegrationhub/ferryman",
"description": "Wrapper utility for Open Integration Hub connectors",
"version": "2.2.0",
"version": "2.3.0",
"main": "run.js",
"scripts": {
"lint": "eslint lib mocha_spec lib runGlobal.js runService.js",
Expand Down
2 changes: 1 addition & 1 deletion lib/ferryman/runGlobal.js
Expand Up @@ -80,7 +80,7 @@ async function run(settings) {
try {
await putOutToSea(settings);
} catch (e) {
console.log(e);
console.error(e);
if (ferryman) {
await ferryman.reportError(e);
}
Expand Down

0 comments on commit e5aa3a7

Please sign in to comment.