Skip to content

Commit

Permalink
Merge pull request #33 from openintegrationhub/continueOnError
Browse files Browse the repository at this point in the history
Added error handling that reacts to continueOnError flag
  • Loading branch information
SvenHoeffler committed Mar 25, 2024
2 parents 6e7c746 + bce7653 commit c3ee51c
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 178 deletions.
138 changes: 75 additions & 63 deletions templates/lib/actions/action.js
Expand Up @@ -19,78 +19,90 @@ async function processAction(msg, cfg, snapshot, incomingMessageHeaders, tokenDa
let logger = this.logger;
const { logLevel } = cfg.nodeSettings;

if (["fatal", "error", "warn", "info", "debug", "trace"].includes(logLevel)) {
logger = this.logger.child({});
logger.level && logger.level(logLevel);
}
let continueOnError = false;
if (cfg && cfg.nodeSettings && cfg.nodeSettings.continueOnError) continueOnError = true;

logger.debug("Incoming message: %j", msg);
logger.trace("Incoming configuration: %j", cfg);
logger.debug("Incoming snapshot: %j", snapshot);
logger.debug("Incoming message headers: %j", incomingMessageHeaders);
logger.debug("Incoming token data: %j", tokenData);

const actionFunction = tokenData["function"];
logger.info("Starting to execute action '%s'", actionFunction);

const action = componentJson.actions[actionFunction];
const { operationId, pathName, method, requestContentType } = action.callParams;
logger.info(
"Found spec callParams: 'pathName': %s, 'method': %s, 'requestContentType': %s",
pathName,
method,
requestContentType
);

const specPath = spec.paths[pathName];
const specPathParameters = specPath[method].parameters ? specPath[method].parameters.map(({ name }) => name) : [];

let body = msg.data;
mapFieldNames(body);
if (requestContentType === "multipart/form-data") {
logger.info("requestContentType multipart/form-data is defined");
body = await mapFormDataBody.call(this, action, body);
}
try {

let parameters = {};
for (let param of specPathParameters) {
parameters[param] = body[param];
}
logger.debug("Parameters were populated from configuration: %j", parameters);
if (["fatal", "error", "warn", "info", "debug", "trace"].includes(logLevel)) {
logger = this.logger.child({});
logger.level && logger.level(logLevel);
}

$SECURITIES;
logger.debug("Incoming message: %j", msg);
logger.trace("Incoming configuration: %j", cfg);
logger.debug("Incoming snapshot: %j", snapshot);
logger.debug("Incoming message headers: %j", incomingMessageHeaders);
logger.debug("Incoming token data: %j", tokenData);

const actionFunction = tokenData["function"];
logger.info("Starting to execute action '%s'", actionFunction);

const action = componentJson.actions[actionFunction];
const { operationId, pathName, method, requestContentType } = action.callParams;
logger.info(
"Found spec callParams: 'pathName': %s, 'method': %s, 'requestContentType': %s",
pathName,
method,
requestContentType
);

const specPath = spec.paths[pathName];
const specPathParameters = specPath[method].parameters ? specPath[method].parameters.map(({ name }) => name) : [];

let body = msg.data;
mapFieldNames(body);
if (requestContentType === "multipart/form-data") {
logger.info("requestContentType multipart/form-data is defined");
body = await mapFormDataBody.call(this, action, body);
}

if (cfg.otherServer) {
if (!spec.servers) {
spec.servers = [];
let parameters = {};
for (let param of specPathParameters) {
parameters[param] = body[param];
}
spec.servers.push({ url: cfg.otherServer });
logger.debug("Server: %s was added to spec servers array", cfg.otherServer);
}
logger.debug("Parameters were populated from configuration: %j", parameters);

const callParams = {
spec: spec,
operationId: operationId,
pathName: pathName,
method: method,
parameters: parameters,
requestContentType: requestContentType,
requestBody: body,
securities: { authorized: securities },
server: spec.servers[cfg.server] || cfg.otherServer,
};
if (callParams.method === "get") {
delete callParams.requestBody;
}
$SECURITIES;

if (cfg.otherServer) {
if (!spec.servers) {
spec.servers = [];
}
spec.servers.push({ url: cfg.otherServer });
logger.debug("Server: %s was added to spec servers array", cfg.otherServer);
}

const callParams = {
spec: spec,
operationId: operationId,
pathName: pathName,
method: method,
parameters: parameters,
requestContentType: requestContentType,
requestBody: body,
securities: { authorized: securities },
server: spec.servers[cfg.server] || cfg.otherServer,
};
if (callParams.method === "get") {
delete callParams.requestBody;
}

const resp = await executeCall.call(this, callParams);

const newElement = {};
newElement.metadata = getMetadata(msg.metadata);
newElement.data = resp.body;
this.emit("data", newElement);
this.logger.info("Execution finished");
const resp = await executeCall.call(this, callParams);

const newElement = {};
newElement.metadata = getMetadata(msg.metadata);
newElement.data = resp.body;
this.emit("data", newElement);
this.logger.info("Execution finished");
} catch (e) {
if (continueOnError === true) {
this.emit('data', {});
}
logger.error(e);
this.emit('error', e);
}
}

module.exports = { process: processAction };

0 comments on commit c3ee51c

Please sign in to comment.