Skip to content

Commit

Permalink
Merge pull request #1460 from openintegrationhub/multiOperations
Browse files Browse the repository at this point in the history
Multi operations
  • Loading branch information
SvenHoeffler committed Aug 17, 2022
2 parents a7cca8c + ef77dde commit 06ca325
Show file tree
Hide file tree
Showing 8 changed files with 21,894 additions and 29 deletions.
159 changes: 132 additions & 27 deletions services/flow-repository/app/api/controllers/flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/* eslint max-len: "off" */
/* eslint func-names: "off" */
/* eslint consistent-return: "off" */
/* eslint no-continue: "off" */

// const path = require('path');
// const _ = require('lodash');
Expand Down Expand Up @@ -125,43 +126,147 @@ router.get('/', jsonParser, can(config.flowReadPermission), async (req, res) =>

// Adds a new flow to the repository
router.post('/', jsonParser, can(config.flowWritePermission), async (req, res) => {
const newFlow = req.body;
try {
let isBulk = false;
let allFlows = req.body;

if (Array.isArray(allFlows)) {
if (!req.user.permissions.includes(config.flowBulkPermission)
&& !req.user.permissions.includes('tenant.all')
&& !req.user.permissions.includes('all')) {
return res.status(403).send({ errors: [{ code: 403, message: 'User is missing permission for bulk operations' }] });
}
isBulk = true;
} else {
allFlows = [allFlows];
}

// Automatically adds the current user as an owner, if not already included.
if (!newFlow.owners) {
newFlow.owners = [];
}
if (newFlow.owners.findIndex((o) => (o.id === req.user.sub)) === -1) {
newFlow.owners.push({ id: req.user.sub, type: 'user' });
}
const results = [];

for (let i = 0; i < allFlows.length; i += 1) {
const newFlow = allFlows[i];
// Automatically adds the current user as an owner, if not already included.
if (!newFlow.owners) {
newFlow.owners = [];
}
if (newFlow.owners.findIndex((o) => (o.id === req.user.sub)) === -1) {
newFlow.owners.push({ id: req.user.sub, type: 'user' });
}

const storeFlow = new Flow(newFlow);
const errors = validate(storeFlow);

if (errors && errors.length > 0) {
results.push({ errors });
continue;
}

try {
const response = await storage.addFlow(storeFlow);
results.push(response);

const ev = {
headers: {
name: 'flowrepo.flow.created',
},
payload: {
tenant: (req.user.tenant) ? req.user.tenant : '',
user: req.user.sub,
flowId: response.id,
},
};
await publishQueue(ev);
} catch (e) {
log.error(e);
results.push({ error: e });
}
}

const storeFlow = new Flow(newFlow);
const errors = validate(storeFlow);
if (isBulk) {
return res.status(201).send({ data: results, meta: {} });
}

if (errors && errors.length > 0) {
return res.status(400).send({ errors });
if (results[0].errors) {
return res.status(400).send({ errors: results[0].errors });
}

return res.status(201).send({ data: results[0], meta: {} });
} catch (err) {
log.error(err);
return res.status(500).send({ errors: [{ message: err }] });
}
});

// Updates a number of flows with body data
router.patch('/bulk', jsonParser, can(config.flowBulkPermission), async (req, res) => {
try {
const response = await storage.addFlow(storeFlow);
let allFlows = req.body;

const ev = {
headers: {
name: 'flowrepo.flow.created',
},
payload: {
tenant: (req.user.tenant) ? req.user.tenant : '',
user: req.user.sub,
flowId: response.id,
},
};
if (!Array.isArray(allFlows)) {
allFlows = [allFlows];
}

await publishQueue(ev);
const results = [];

for (let i = 0; i < allFlows.length; i += 1) {
const updateData = allFlows[i];

// Get the current flow
const oldFlow = await storage.getFlowById(updateData.id, req.user);

if (!oldFlow) {
results.push({ errors: [{ message: `Flow ${updateData.id} not found`, code: 404 }] });
}

if (oldFlow.status !== 'inactive') {
results.push({ errors: [{ message: `Flow ${oldFlow.id} is not inactive. Current status: ${oldFlow.status}`, code: 409 }] });
}

const updateFlow = Object.assign(oldFlow, updateData);
updateFlow._id = updateFlow.id;
delete updateFlow.id;

// Re-adds the current user to the owners array if they're missing
if (!updateFlow.owners) {
updateFlow.owners = [];
}
if (updateFlow.owners.findIndex((o) => (o.id === req.user.sub)) === -1) {
updateFlow.owners.push({ id: req.user.sub, type: 'user' });
}

const storeFlow = new Flow(updateFlow);

const errors = validate(storeFlow);

if (errors && errors.length > 0) {
results.push({ errors });
continue;
}

try {
const response = await storage.updateFlow(storeFlow, req.user);
results.push(response);
const ev = {
headers: {
name: 'flowrepo.flow.modified',
},
payload: {
tenant: (req.user.tenant) ? req.user.tenant : '',
user: req.user.sub,
flowId: response.id,
},
};

await publishQueue(ev);
} catch (e) {
log.error(e);
results.push(e);
}
}

return res.status(201).send({ data: response, meta: {} });
res.status(200).send({ data: results, meta: {} });
} catch (err) {
log.error(err);
return res.status(500).send({ errors: [{ message: err }] });
res.status(500).send({ errors: [{ message: err }] });
}
});

Expand Down
98 changes: 98 additions & 0 deletions services/flow-repository/app/api/controllers/startstop.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,104 @@ const router = express.Router();

const log = require('../../config/logger'); // eslint-disable-line

// Start multiple flows
router.post('/multiple/start', jsonParser, can(config.flowControlPermission), async (req, res) => {
const errors = false;
const results = [];
const flowIds = req.body;
try {
if (!Array.isArray(flowIds)) {
return res.status(500).send({ errors: [{ message: 'No flow ids given. Expecting an array of flow ids', code: 500 }] });
}

for (let i = 0; i < flowIds.length; i += 1) {
const flowId = flowIds[i];

if (!mongoose.Types.ObjectId.isValid(flowId)) {
results.push({ errors: [{ message: 'Invalid id', flowId, code: 400 }] });
continue; // eslint-disable-line no-continue
}

const currentFlow = await storage.getFlowById(flowId, req.user);
if (!currentFlow) {
results.push({ errors: [{ message: 'No flow with this ID found', flowId, code: 404 }] });
}

const flow = await storage.startingFlow(req.user, flowId);

const ev = {
headers: {
name: 'flow.starting',
},
payload: flow,
};

ev.payload.startedBy = req.user.sub;

await publishQueue(ev);

results.push({ data: { id: flow.id, status: flow.status }, meta: {} });
}

if (errors) {
return res.status(500).send(results);
}
return res.status(200).send(results);
} catch (e) {
log.error(e);
results.push(e);
return res.status(500).send(results);
}
});

// Stop multiple flows
router.post('/multiple/stop', jsonParser, can(config.flowControlPermission), async (req, res) => {
const errors = false;
const results = [];
const flowIds = req.body;
try {
if (!Array.isArray(flowIds)) {
return res.status(500).send({ errors: [{ message: 'No flow ids given. Expecting an array of flow ids', code: 500 }] });
}

for (let i = 0; i < flowIds.length; i += 1) {
const flowId = flowIds[i];

if (!mongoose.Types.ObjectId.isValid(flowId)) {
results.push({ errors: [{ message: 'Invalid id', flowId, code: 400 }] });
continue; // eslint-disable-line no-continue
}

const currentFlow = await storage.getFlowById(flowId, req.user);
if (!currentFlow) {
results.push({ errors: [{ message: 'No flow with this ID found', flowId, code: 404 }] });
}

const flow = await storage.stoppingFlow(req.user, flowId);

const ev = {
headers: {
name: 'flow.stopping',
},
payload: flow,
};

await publishQueue(ev);

results.push({ data: { id: flow.id, status: flow.status }, meta: {} });
}

if (errors) {
return res.status(500).send(results);
}
return res.status(200).send(results);
} catch (e) {
log.error(e);
results.push(e);
return res.status(500).send(results);
}
});

// Start a flow
router.post('/:id/start', jsonParser, can(config.flowControlPermission), async (req, res) => {
const flowId = req.params.id;
Expand Down

0 comments on commit 06ca325

Please sign in to comment.