Skip to content

Commit

Permalink
FIxes for Snapshots and Flow Execution Lifecycle (#1461)
Browse files Browse the repository at this point in the history
* Emit function end messages from components and process them in Orchestrator
  * Use new Function State Queue

* Create snapshots without flowExecId

* allow for flowExecId onUpdateSnapshot()

* Bump Versions for all affected services
  • Loading branch information
weberjm committed Sep 16, 2022
1 parent afaf2b3 commit a3b4e9f
Show file tree
Hide file tree
Showing 16 changed files with 55 additions and 123 deletions.
2 changes: 1 addition & 1 deletion lib/backend-commons-lib/package.json
@@ -1,6 +1,6 @@
{
"name": "backend-commons-lib",
"version": "2.0.0",
"version": "2.1.0",
"description": "Common part for all services",
"main": "index.js",
"scripts": {
Expand Down
9 changes: 6 additions & 3 deletions lib/backend-commons-lib/spec/QueueCreator.spec.js
Expand Up @@ -63,7 +63,8 @@ describe('QueueCreator', () => {
'ERROR_ROUTING_KEY': 'orchestrator_backchannel.error',
'LISTEN_MESSAGES_ON': 'flow-12345:step_1:messages',
'REBOUND_ROUTING_KEY': 'flow-12345.step_1.rebound',
'SNAPSHOT_ROUTING_KEY': 'flow-12345.step_1.snapshot'
'SNAPSHOT_ROUTING_KEY': 'flow-12345.step_1.snapshot',
'STATE_ROUTING_KEY': 'orchestrator_backchannel.step_state'
},
step_2: {
'BACKCHANNEL_EXCHANGE': 'orchestrator_backchannel',
Expand All @@ -72,7 +73,8 @@ describe('QueueCreator', () => {
'ERROR_ROUTING_KEY': 'orchestrator_backchannel.error',
'LISTEN_MESSAGES_ON': 'flow-12345:step_2:messages',
'REBOUND_ROUTING_KEY': 'flow-12345.step_2.rebound',
'SNAPSHOT_ROUTING_KEY': 'flow-12345.step_2.snapshot'
'SNAPSHOT_ROUTING_KEY': 'flow-12345.step_2.snapshot',
'STATE_ROUTING_KEY': 'orchestrator_backchannel.step_state'
},

//
Expand All @@ -90,7 +92,8 @@ describe('QueueCreator', () => {
'ERROR_ROUTING_KEY': 'orchestrator_backchannel.error',
'LISTEN_MESSAGES_ON': 'flow-12345:step_5:messages',
'REBOUND_ROUTING_KEY': 'flow-12345.step_5.rebound',
'SNAPSHOT_ROUTING_KEY': 'flow-12345.step_5.snapshot'
'SNAPSHOT_ROUTING_KEY': 'flow-12345.step_5.snapshot',
'STATE_ROUTING_KEY': 'orchestrator_backchannel.step_state'
}
});
});
Expand Down
2 changes: 2 additions & 0 deletions lib/backend-commons-lib/src/QueueCreator.js
Expand Up @@ -310,6 +310,7 @@ class QueueCreator {
NODE_EXCHANGE: exchangeName,
BACKCHANNEL_EXCHANGE: 'orchestrator_backchannel',
ERROR_ROUTING_KEY: 'orchestrator_backchannel.error',
STATE_ROUTING_KEY: 'orchestrator_backchannel.step_state',
SNAPSHOT_ROUTING_KEY: snapshotRoutingKey,
REBOUND_ROUTING_KEY: reboundRoutingKey,
OUTPUT_ROUTING_KEY: 'orchestrator_backchannel.input'
Expand Down Expand Up @@ -370,6 +371,7 @@ class QueueCreator {
NODE_EXCHANGE: exchangeName,
BACKCHANNEL_EXCHANGE: 'orchestrator_backchannel',
ERROR_ROUTING_KEY: 'orchestrator_backchannel.error',
STATE_ROUTING_KEY: 'orchestrator_backchannel.step_state',
SNAPSHOT_ROUTING_KEY: snapshotRoutingKey,
REBOUND_ROUTING_KEY: reboundRoutingKey,
OUTPUT_ROUTING_KEY: 'orchestrator_backchannel.input'
Expand Down
2 changes: 1 addition & 1 deletion lib/component-orchestrator/package.json
@@ -1,6 +1,6 @@
{
"name": "@openintegrationhub/component-orchestrator",
"version": "1.4.0",
"version": "1.5.0",
"description": "Orchestrates cluster resources",
"main": "src/index.js",
"scripts": {
Expand Down
12 changes: 6 additions & 6 deletions lib/component-orchestrator/src/ComponentOrchestrator.js
Expand Up @@ -355,11 +355,16 @@ class ComponentOrchestrator {
try {

const messageContent = JSON.parse(message.content.toString())
const { routingKey } = message.fields;
const { orchestratorToken } = message.properties.headers;
const { stepId, flowId, flowExecId, apiKey, specialFlags } = jwt.verify(orchestratorToken, this._tokenSecret);

const flow = await this._flowsDao.findById(flowId);

if (routingKey === 'orchestrator_backchannel.step_state') {
this._checkFlowProgress(flowId, stepId, await this._flowStateDao.upsertCount(flowExecId, 0, 1));
return;
}
// check for privileged component (e.g. logic gateway)
if (specialFlags && specialFlags.privilegedComponent) {

Expand Down Expand Up @@ -421,7 +426,7 @@ class ComponentOrchestrator {

if (nextSteps.length) {

this._checkFlowProgress(flowId, stepId, await this._flowStateDao.upsertCount(flowExecId, nextSteps.length, 1))
await this._flowStateDao.upsertCount(flowExecId, nextSteps.length, 0)

const promises = nextSteps.map((stepId) => {
const componentId = flow.getPropertiesByNodeId(stepId).componentId
Expand All @@ -438,13 +443,8 @@ class ComponentOrchestrator {
);
});
Promise.all(promises);
} else {
this._checkFlowProgress(flowId, stepId, await this._flowStateDao.upsertCount(flowExecId, 0, 1))
}




} catch (err) {
const { taskId, stepId } = message.properties.headers;
this._logger.error({ err, taskId, stepId }, 'Failed to process result');
Expand Down
13 changes: 12 additions & 1 deletion lib/ferryman/lib/amqp.js
Expand Up @@ -356,7 +356,7 @@ class Amqp {
const { settings } = this;

// eslint-disable-next-line
data.headers = filterMessageHeaders(data.headers);
data.headers = filterMessageHeaders(data.headers);
const protocolVersion = Number(properties.headers.protocolVersion || 1);

const encryptedData = this.encryptMessageContent(data, protocolVersion);
Expand Down Expand Up @@ -500,6 +500,17 @@ class Amqp {
return result;
}

async sendFunctionComplete(headers, throttle) {
const properties = this._createPropsFromHeaders(headers);
const { settings } = this;
properties.headers.protocolVersion = settings.PROTOCOL_VERSION;

if (settings.STATE_ROUTING_KEY) {
return this.sendToExchange(settings.BACKCHANNEL_EXCHANGE, settings.STATE_ROUTING_KEY,
JSON.stringify('function complete'), properties, throttle);
}
}

async sendRebound(reboundError, originalMessage, headers) {
// TODO: inconsistency
// rebound message should be
Expand Down
4 changes: 3 additions & 1 deletion lib/ferryman/lib/ferryman.js
Expand Up @@ -1083,7 +1083,7 @@ class Ferryman {
delete data.passedCfg;
const headers = _.clone(outgoingMessageHeaders);
headers.snapshotEvent = 'snapshot';
headers.flowExecId = tokenData.flowExecId;
//headers.flowExecId = tokenData.flowExecId;
headers.tenant = tokenData.tenant;
// self.snapshot = data; // replacing `local` snapshot
return self.amqpConnection.sendSnapshot(
Expand Down Expand Up @@ -1164,6 +1164,8 @@ class Ferryman {
messageProcessingTime: Date.now() - timeStart
}, 'processMessage emit end');

const headers = _.clone(outgoingMessageHeaders);
self.amqpConnection.sendFunctionComplete(headers,null);
resolve();
}
});
Expand Down
1 change: 1 addition & 0 deletions lib/ferryman/lib/settings.js
Expand Up @@ -46,6 +46,7 @@ function readFrom(envVars) {

const optional = {
ERROR_ROUTING_KEY: 'flows.error',
STATE_ROUTING_KEY: 'orchestrator_backchannel.step_state',
SECRET_SERVICE_BASE_URL: 'http://secret-service.oih.svc.cluster.local:3000/api/v1',
GOVERNANCE_SERVICE_BASE_URL: 'http://governance-service.oih-dev-ns.svc.cluster.local:3009',
GOVERNANCE_ROUTING_KEY: 'provenance',
Expand Down
1 change: 1 addition & 0 deletions lib/ferryman/mocha_spec/unit/ferryman.spec.js
Expand Up @@ -218,6 +218,7 @@ describe('Ferryman', () => {
sendGovernanceChannel: sandbox.stub(),
sendError: sandbox.stub(),
sendRebound: sandbox.stub(),
sendFunctionComplete: sandbox.stub(),
ack: sandbox.stub(),
reject: sandbox.stub(),
sendSnapshot: sandbox.stub(),
Expand Down
2 changes: 1 addition & 1 deletion lib/ferryman/package.json
@@ -1,7 +1,7 @@
{
"name": "@openintegrationhub/ferryman",
"description": "Wrapper utility for Open Integration Hub connectors",
"version": "2.1.1",
"version": "2.2.0",
"main": "run.js",
"scripts": {
"lint": "eslint lib mocha_spec lib runGlobal.js runService.js",
Expand Down

0 comments on commit a3b4e9f

Please sign in to comment.